You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:03 UTC

[06/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
deleted file mode 100644
index 6c71036..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.metadata.entitytupletranslators;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.builders.IARecordBuilder;
-import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
-import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.AUnorderedList;
-import edu.uci.ics.asterix.om.base.IACursor;
-import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-/**
- * Translates a Dataset metadata entity to an ITupleReference and vice versa.
- */
-public class FeedActivityTupleTranslator extends AbstractTupleTranslator<FeedActivity> {
-    // Field indexes of serialized FeedActivity in a tuple.
-    // Key field.
-    public static final int FEED_ACTIVITY_ACTIVITY_DATAVERSE_NAME_FIELD_INDEX = 0;
-
-    public static final int FEED_ACTIVITY_ACTIVITY_FEED_NAME_FIELD_INDEX = 1;
-
-    public static final int FEED_ACTIVITY_ACTIVITY_DATASET_NAME_FIELD_INDEX = 2;
-
-    public static final int FEED_ACTIVITY_ACTIVITY_ID_FIELD_INDEX = 3;
-
-    // Payload field containing serialized FeedActivity.
-    public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 4;
-
-    @SuppressWarnings("unchecked")
-    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
-    private AMutableInt32 aInt32;
-    protected ISerializerDeserializer<AInt32> aInt32Serde;
-
-    @SuppressWarnings("unchecked")
-    public FeedActivityTupleTranslator(boolean getTuple) {
-        super(getTuple, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET.getFieldCount());
-        aInt32 = new AMutableInt32(-1);
-        aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-    }
-
-    @Override
-    public FeedActivity getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
-        byte[] serRecord = frameTuple.getFieldData(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
-        int recordStartOffset = frameTuple.getFieldStart(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
-        int recordLength = frameTuple.getFieldLength(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
-        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
-        DataInput in = new DataInputStream(stream);
-        ARecord feedActivityRecord = (ARecord) recordSerDes.deserialize(in);
-        return createFeedActivityFromARecord(feedActivityRecord);
-    }
-
-    private FeedActivity createFeedActivityFromARecord(ARecord feedActivityRecord) {
-
-        String dataverseName = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
-        String feedName = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX)).getStringValue();
-        String datasetName = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
-        int activityId = ((AInt32) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX)).getIntegerValue();
-        String feedActivityType = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX)).getStringValue();
-
-        IACursor cursor = ((AUnorderedList) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX)).getCursor();
-        Map<String, String> activityDetails = new HashMap<String, String>();
-        String key;
-        String value;
-        while (cursor.next()) {
-            ARecord field = (ARecord) cursor.get();
-            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
-            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
-            activityDetails.put(key, value);
-        }
-
-        String feedActivityTimestamp = ((AString) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX))
-                .getStringValue();
-
-        FeedActivity fa = new FeedActivity(dataverseName, feedName, datasetName,
-                FeedActivityType.valueOf(feedActivityType), activityDetails);
-        fa.setLastUpdatedTimestamp(feedActivityTimestamp);
-        fa.setActivityId(activityId);
-        return fa;
-    }
-
-    @Override
-    public ITupleReference getTupleFromMetadataEntity(FeedActivity feedActivity) throws IOException, MetadataException {
-        // write the key in the first three fields of the tuple
-        ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
-
-        tupleBuilder.reset();
-        aString.setValue(feedActivity.getDataverseName());
-        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        aString.setValue(feedActivity.getFeedName());
-        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        aString.setValue(feedActivity.getDatasetName());
-        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-
-        aInt32.setValue(feedActivity.getActivityId());
-        int32Serde.serialize(aInt32, tupleBuilder.getDataOutput());
-        tupleBuilder.addFieldEndOffset();
-        // write the pay-load in the 2nd field of the tuple
-
-        recordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
-
-        // write field 0
-        fieldValue.reset();
-        aString.setValue(feedActivity.getDataverseName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 1
-        fieldValue.reset();
-        aString.setValue(feedActivity.getFeedName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_FEED_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 2
-        fieldValue.reset();
-        aString.setValue(feedActivity.getDatasetName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 3
-        fieldValue.reset();
-        aInt32.setValue(feedActivity.getActivityId());
-        int32Serde.serialize(aInt32, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX, fieldValue);
-
-        // write field 4
-        fieldValue.reset();
-        aString.setValue(feedActivity.getFeedActivityType().name());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX, fieldValue);
-
-        // write field 5
-        Map<String, String> properties = feedActivity.getFeedActivityDetails();
-        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-        listBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX]);
-        for (Map.Entry<String, String> property : properties.entrySet()) {
-            String name = property.getKey();
-            String value = property.getValue();
-            itemValue.reset();
-            writePropertyTypeRecord(name, value, itemValue.getDataOutput());
-            listBuilder.addItem(itemValue);
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX, fieldValue);
-
-        // write field 6
-        fieldValue.reset();
-        aString.setValue(Calendar.getInstance().getTime().toString());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
-
-        // write record
-        try {
-            recordBuilder.write(tupleBuilder.getDataOutput(), true);
-        } catch (AsterixException e) {
-            throw new MetadataException(e);
-        }
-        tupleBuilder.addFieldEndOffset();
-
-        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
-        return tuple;
-    }
-
-    public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
-        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
-        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
-        propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_DETAILS_RECORDTYPE);
-        AMutableString aString = new AMutableString("");
-        ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(BuiltinType.ASTRING);
-
-        // write field 0
-        fieldValue.reset();
-        aString.setValue(name);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(0, fieldValue);
-
-        // write field 1
-        fieldValue.reset();
-        aString.setValue(value);
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        propertyRecordBuilder.addField(1, fieldValue);
-
-        try {
-            propertyRecordBuilder.write(out, true);
-        } catch (IOException | AsterixException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
index def0938..32ed123 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedTupleTranslator.java
@@ -25,8 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.OrderedListBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -34,8 +34,9 @@ import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.om.base.AInt32;
-import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.base.ARecord;
@@ -65,14 +66,9 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
     @SuppressWarnings("unchecked")
     private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(MetadataRecordTypes.FEED_RECORDTYPE);
-    private AMutableInt32 aInt32;
-    protected ISerializerDeserializer<AInt32> aInt32Serde;
 
-    @SuppressWarnings("unchecked")
     public FeedTupleTranslator(boolean getTuple) {
         super(getTuple, MetadataPrimaryIndexes.FEED_DATASET.getFieldCount());
-        aInt32 = new AMutableInt32(-1);
-        aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
     }
 
     @Override
@@ -92,49 +88,64 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
                 .getValueByPos(MetadataRecordTypes.FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
         String feedName = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_NAME_FIELD_INDEX))
                 .getStringValue();
-        String adapterName = ((AString) feedRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX)).getStringValue();
-
-        IACursor cursor = ((AUnorderedList) feedRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX)).getCursor();
-        String key;
-        String value;
-        Map<String, String> adapterConfiguration = new HashMap<String, String>();
-        while (cursor.next()) {
-            ARecord field = (ARecord) cursor.get();
-            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
-            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
-            adapterConfiguration.put(key, value);
-        }
 
         Object o = feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX);
         FunctionSignature signature = null;
         if (!(o instanceof ANull)) {
-            String functionIdentifier = ((AString) o).getStringValue();
-            String[] qnameComponents = functionIdentifier.split("\\.");
-            String functionDataverse;
-            String functionName;
-            if (qnameComponents.length == 2) {
-                functionDataverse = qnameComponents[0];
-                functionName = qnameComponents[1];
-            } else {
-                functionDataverse = dataverseName;
-                functionName = qnameComponents[0];
+            String functionName = ((AString) o).getStringValue();
+            signature = new FunctionSignature(dataverseName, functionName, 1);
+        }
+
+        String feedType = ((AString) feedRecord.getValueByPos(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX))
+                .getStringValue();
+
+        FeedType feedTypeEnum = FeedType.valueOf(feedType.toUpperCase());
+        switch (feedTypeEnum) {
+            case PRIMARY: {
+                ARecord feedTypeDetailsRecord = (ARecord) feedRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX);
+                String adapterName = ((AString) feedTypeDetailsRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX))
+                        .getStringValue();
+
+                IACursor cursor = ((AUnorderedList) feedTypeDetailsRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX))
+                        .getCursor();
+                String key;
+                String value;
+                Map<String, String> adaptorConfiguration = new HashMap<String, String>();
+                while (cursor.next()) {
+                    ARecord field = (ARecord) cursor.get();
+                    key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+                            .getStringValue();
+                    value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+                            .getStringValue();
+                    adaptorConfiguration.put(key, value);
+                }
+                feed = new PrimaryFeed(dataverseName, feedName, adapterName, adaptorConfiguration, signature);
+
             }
+                break;
+            case SECONDARY: {
+                ARecord feedTypeDetailsRecord = (ARecord) feedRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX);
+
+                String sourceFeedName = ((AString) feedTypeDetailsRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX))
+                        .getStringValue();
+
+                feed = new SecondaryFeed(dataverseName, feedName, sourceFeedName, signature);
 
-            String[] nameComponents = functionName.split("@");
-            signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
+            }
+                break;
         }
 
-        feed = new Feed(dataverseName, feedName, adapterName, adapterConfiguration, signature);
         return feed;
     }
 
     @Override
     public ITupleReference getTupleFromMetadataEntity(Feed feed) throws IOException, MetadataException {
-        // write the key in the first three fields of the tuple
-        ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
-
+        // write the key in the first two fields of the tuple
         tupleBuilder.reset();
         aString.setValue(feed.getDataverseName());
         stringSerde.serialize(aString, tupleBuilder.getDataOutput());
@@ -160,35 +171,23 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
 
         // write field 2
         fieldValue.reset();
-        aString.setValue(feed.getAdapterName());
-        stringSerde.serialize(aString, fieldValue.getDataOutput());
-        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX, fieldValue);
-
-        // write field 3 (adapterConfiguration)
-        Map<String, String> adapterConfiguration = feed.getAdapterConfiguration();
-        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-        listBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX]);
-        for (Map.Entry<String, String> property : adapterConfiguration.entrySet()) {
-            String name = property.getKey();
-            String value = property.getValue();
-            itemValue.reset();
-            writePropertyTypeRecord(name, value, itemValue.getDataOutput());
-            listBuilder.addItem(itemValue);
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX, fieldValue);
-
-        // write field 4
-        fieldValue.reset();
         if (feed.getAppliedFunction() != null) {
-            aString.setValue(feed.getAppliedFunction().toString());
+            aString.setValue(feed.getAppliedFunction().getName());
             stringSerde.serialize(aString, fieldValue.getDataOutput());
             recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
         }
 
-        // write field 5
+        // write field 3
+        fieldValue.reset();
+        aString.setValue(feed.getFeedType().name().toUpperCase());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_FEED_TYPE_FIELD_INDEX, fieldValue);
+
+        // write field 4/5
+        fieldValue.reset();
+        writeFeedTypeDetailsRecordType(recordBuilder, feed, fieldValue);
+
+        // write field 6
         fieldValue.reset();
         aString.setValue(Calendar.getInstance().getTime().toString());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
@@ -206,6 +205,85 @@ public class FeedTupleTranslator extends AbstractTupleTranslator<Feed> {
         return tuple;
     }
 
+    @SuppressWarnings("unchecked")
+    private void writeFeedTypeDetailsRecordType(IARecordBuilder recordBuilder, Feed feed,
+            ArrayBackedValueStorage fieldValue) throws HyracksDataException {
+
+        switch (feed.getFeedType()) {
+            case PRIMARY: {
+                PrimaryFeed primaryFeed = (PrimaryFeed) feed;
+
+                IARecordBuilder primaryDetailsRecordBuilder = new RecordBuilder();
+                OrderedListBuilder listBuilder = new OrderedListBuilder();
+                ArrayBackedValueStorage primaryRecordfieldValue = new ArrayBackedValueStorage();
+                ArrayBackedValueStorage primaryRecordItemValue = new ArrayBackedValueStorage();
+                primaryDetailsRecordBuilder.reset(MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE);
+
+                AMutableString aString = new AMutableString("");
+                ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(BuiltinType.ASTRING);
+
+                // write field 0
+                fieldValue.reset();
+                aString.setValue(primaryFeed.getAdaptorName());
+                stringSerde.serialize(aString, primaryRecordfieldValue.getDataOutput());
+                primaryDetailsRecordBuilder.addField(
+                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX,
+                        primaryRecordfieldValue);
+
+                // write field 1
+                listBuilder
+                        .reset((AUnorderedListType) MetadataRecordTypes.PRIMARY_FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX]);
+                for (Map.Entry<String, String> property : primaryFeed.getAdaptorConfiguration().entrySet()) {
+                    String name = property.getKey();
+                    String value = property.getValue();
+                    primaryRecordItemValue.reset();
+                    writePropertyTypeRecord(name, value, primaryRecordItemValue.getDataOutput());
+                    listBuilder.addItem(primaryRecordItemValue);
+                }
+                primaryRecordfieldValue.reset();
+                listBuilder.write(primaryRecordfieldValue.getDataOutput(), true);
+                primaryDetailsRecordBuilder.addField(
+                        MetadataRecordTypes.FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX,
+                        primaryRecordfieldValue);
+
+                try {
+                    primaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
+                } catch (IOException | AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+
+                recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
+            }
+                break;
+
+            case SECONDARY:
+                SecondaryFeed secondaryFeed = (SecondaryFeed) feed;
+
+                IARecordBuilder secondaryDetailsRecordBuilder = new RecordBuilder();
+                ArrayBackedValueStorage secondaryFieldValue = new ArrayBackedValueStorage();
+                secondaryDetailsRecordBuilder.reset(MetadataRecordTypes.SECONDARY_FEED_DETAILS_RECORDTYPE);
+
+                // write field 0
+                fieldValue.reset();
+                aString.setValue(secondaryFeed.getSourceFeedName());
+                stringSerde.serialize(aString, secondaryFieldValue.getDataOutput());
+                secondaryDetailsRecordBuilder.addField(
+                        MetadataRecordTypes.FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX,
+                        secondaryFieldValue);
+
+                try {
+                    secondaryDetailsRecordBuilder.write(fieldValue.getDataOutput(), true);
+                } catch (IOException | AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                recordBuilder.addField(MetadataRecordTypes.FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX, fieldValue);
+                break;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
     public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
         IARecordBuilder propertyRecordBuilder = new RecordBuilder();
         ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
new file mode 100644
index 0000000..7fc2e7e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/IAdapterFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.external;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
+ * Acts as a marker interface indicating that the implementation provides functionality
+ * for creating an adapter.
+ */
+public interface IAdapterFactory extends Serializable {
+
+    public static final String KEY_TYPE_NAME = "type-name";
+
+    public enum SupportedOperation {
+        READ,
+        WRITE,
+        READ_WRITE
+    }
+
+    /**
+     * Returns the type of adapter indicating if the adapter can be used for
+     * reading from an external data source or writing to an external data
+     * source or can be used for both purposes.
+     * 
+     * @see SupportedOperation
+     * @return
+     */
+    public SupportedOperation getSupportedOperations();
+
+    /**
+     * Returns the display name corresponding to the Adapter type that is created by the factory.
+     * 
+     * @return the display name
+     */
+    public String getName();
+
+    /**
+     * Gets a list of partition constraints. A partition constraint can be a
+     * requirement to execute at a particular location or could be cardinality
+     * constraints indicating the number of instances that need to run in
+     * parallel. example, a IDatasourceAdapter implementation written for data
+     * residing on the local file system of a node cannot run on any other node
+     * and thus has a location partition constraint. The location partition
+     * constraint can be expressed as a node IP address or a node controller id.
+     * In the former case, the IP address is translated to a node controller id
+     * running on the node with the given IP address.
+     */
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
+
+    /**
+     * Creates an instance of IDatasourceAdapter.
+     * 
+     * @param HyracksTaskContext
+     * @param partition
+     * @return An instance of IDatasourceAdapter.
+     * @throws Exception
+     */
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
+
+    /**
+     * @param configuration
+     * @param outputType
+     * @throws Exception
+     */
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
+
+    /**
+     * Gets the record type associated with the output of the adapter
+     * 
+     * @return
+     */
+    public ARecordType getAdapterOutputType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
index 9e8e5f7..3bc402b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,23 +29,11 @@ public abstract class AbstractDatasourceAdapter implements IDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
 
+    public static final String KEY_PARSER_FACTORY = "parser";
+
     protected Map<String, Object> configuration;
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
 
-    protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
-
-    public static final String KEY_FORMAT = "format";
-    public static final String KEY_PARSER_FACTORY = "parser";
-    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
-    public static final String FORMAT_ADM = "adm";
-
-    private static Map<String, Object> initializeFormatParserFactoryMap() {
-        Map<String, Object> map = new HashMap<String, Object>();
-        map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
-        map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
-        return map;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
new file mode 100644
index 0000000..86ca550
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+
+
+public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    protected FeedPolicyEnforcer policyEnforcer;
+
+    public FeedPolicyEnforcer getPolicyEnforcer() {
+        return policyEnforcer;
+    }
+
+    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+        this.policyEnforcer = policyEnforcer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
new file mode 100644
index 0000000..6bd726c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterExecutor.java
@@ -0,0 +1,56 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager.State;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+
+public class AdapterExecutor implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
+
+    private final DistributeFeedFrameWriter writer;
+
+    private final IFeedAdapter adapter;
+
+    private final IAdapterRuntimeManager adapterManager;
+
+    public AdapterExecutor(int partition, DistributeFeedFrameWriter writer, IFeedAdapter adapter,
+            IAdapterRuntimeManager adapterManager) {
+        this.writer = writer;
+        this.adapter = adapter;
+        this.adapterManager = adapterManager;
+    }
+
+    @Override
+    public void run() {
+        int partition = adapterManager.getPartition();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting ingestion for partition:" + partition);
+        }
+        boolean continueIngestion = true;
+        boolean failedIngestion = false;
+        while (continueIngestion) {
+            try {
+                adapter.start(partition, writer);
+                continueIngestion = false;
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Exception during feed ingestion " + e.getMessage());
+                    e.printStackTrace();
+                }
+                continueIngestion = adapter.handleException(e);
+                failedIngestion = !continueIngestion;
+            }
+        }
+
+        adapterManager.setState(failedIngestion ? State.FAILED_INGESTION : State.FINISHED_INGESTION);
+        synchronized (adapterManager) {
+            adapterManager.notifyAll();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
index 897faae..b032bab 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
@@ -17,40 +17,46 @@ package edu.uci.ics.asterix.metadata.feeds;
 import java.io.Serializable;
 
 /**
- * A unique identifier for a datasource adapter.
+ * A unique identifier for a data source adapter.
  */
 public class AdapterIdentifier implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
     private final String namespace;
-    private final String adapterName;
+    private final String name;
 
-    public AdapterIdentifier(String namespace, String adapterName) {
+    public AdapterIdentifier(String namespace, String name) {
         this.namespace = namespace;
-        this.adapterName = adapterName;
+        this.name = name;
     }
 
     public String getNamespace() {
         return namespace;
     }
 
-    public String getAdapterName() {
-        return adapterName;
+    public String getName() {
+        return name;
     }
 
     @Override
     public int hashCode() {
-        return (namespace + "@" + adapterName).hashCode();
+        return (namespace + "@" + name).hashCode();
 
     }
 
     @Override
     public boolean equals(Object o) {
+        if (o == null) {
+            return false;
+        }
+        if (this == o) {
+            return true;
+        }
         if (!(o instanceof AdapterIdentifier)) {
             return false;
         }
         return namespace.equals(((AdapterIdentifier) o).getNamespace())
-                && namespace.equals(((AdapterIdentifier) o).getNamespace());
+                && name.equals(((AdapterIdentifier) o).getName());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index b9a5e73..de288ba 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -15,68 +15,51 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.FeedFrameWriter.Mode;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.asterix.common.feeds.DistributeFeedFrameWriter;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.api.IAdapterRuntimeManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
 
-public class AdapterRuntimeManager implements IAdapterExecutor {
+public class AdapterRuntimeManager implements IAdapterRuntimeManager {
 
     private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
 
-    private final FeedConnectionId feedId;
+    private final FeedId feedId;
 
-    private IFeedAdapter feedAdapter;
+    private final IFeedAdapter feedAdapter;
 
-    private AdapterExecutor adapterExecutor;
+    private final IIntakeProgressTracker tracker;
 
-    private State state;
+    private final AdapterExecutor adapterExecutor;
+
+    private final int partition;
 
-    private int partition;
+    private final ExecutorService executorService;
 
     private IngestionRuntime ingestionRuntime;
 
-    private final IFeedManager feedManager;
-
-    public enum State {
-        /*
-         * Indicates that data from external source will be pushed downstream for storage 
-         */
-        ACTIVE_INGESTION,
-        /*
-         * Indicates that data from external source would be buffered and not pushed downstream 
-         */
-        INACTIVE_INGESTION,
-        /*
-         * Indicates that feed ingestion activity has finished
-         */
-        FINISHED_INGESTION
-    }
+    private State state;
 
-    public AdapterRuntimeManager(FeedConnectionId feedId, IFeedAdapter feedAdapter, FeedFrameWriter writer,
-            int partition, LinkedBlockingQueue<IFeedMessage> inbox, IFeedManager feedManager) {
+    public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, IIntakeProgressTracker tracker,
+            DistributeFeedFrameWriter writer, int partition) {
         this.feedId = feedId;
         this.feedAdapter = feedAdapter;
+        this.tracker = tracker;
         this.partition = partition;
-        this.feedManager = feedManager;
         this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+        this.executorService = Executors.newSingleThreadExecutor();
+        this.state = State.INACTIVE_INGESTION;
     }
 
     @Override
     public void start() throws Exception {
         state = State.ACTIVE_INGESTION;
-        ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
-        feedManager.registerFeedRuntime(ingestionRuntime);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
-        }
-        ExecutorService executorService = feedManager.getFeedExecutorService(feedId);
         executorService.execute(adapterExecutor);
     }
 
@@ -84,19 +67,18 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
     public void stop() {
         try {
             feedAdapter.stop();
-            state = State.FINISHED_INGESTION;
-            synchronized (this) {
-                notifyAll();
-            }
         } catch (Exception exception) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe("Unable to stop adapter " + feedAdapter + ", encountered exception " + exception);
             }
+        } finally {
+            state = State.FINISHED_INGESTION;
+            executorService.shutdown();
         }
     }
 
     @Override
-    public FeedConnectionId getFeedId() {
+    public FeedId getFeedId() {
         return feedId;
     }
 
@@ -109,84 +91,15 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
         return feedAdapter;
     }
 
-    public void setFeedAdapter(IFeedAdapter feedAdapter) {
-        this.feedAdapter = feedAdapter;
-    }
-
-    public static class AdapterExecutor implements Runnable {
-
-        private FeedFrameWriter writer;
-
-        private IFeedAdapter adapter;
-
-        private AdapterRuntimeManager runtimeManager;
-
-        public AdapterExecutor(int partition, FeedFrameWriter writer, IFeedAdapter adapter,
-                AdapterRuntimeManager adapterRuntimeMgr) {
-            this.writer = writer;
-            this.adapter = adapter;
-            this.runtimeManager = adapterRuntimeMgr;
-        }
-
-        @Override
-        public void run() {
-            try {
-                int partition = runtimeManager.getPartition();
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Starting ingestion for partition:" + partition);
-                }
-                adapter.start(partition, writer);
-                runtimeManager.setState(State.FINISHED_INGESTION);
-            } catch (Exception e) {
-                e.printStackTrace();
-                if (LOGGER.isLoggable(Level.SEVERE)) {
-                    LOGGER.severe("Exception during feed ingestion " + e.getMessage());
-                }
-            } finally {
-                synchronized (runtimeManager) {
-                    runtimeManager.notifyAll();
-                }
-            }
-        }
-
-        public FeedFrameWriter getWriter() {
-            return writer;
-        }
-
-        public void setWriter(IFrameWriter writer) {
-            if (this.writer != null) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Switching writer to:" + writer + " from " + this.writer);
-                }
-                this.writer.setWriter(writer);
-            }
-        }
-
+    public IIntakeProgressTracker getTracker() {
+        return tracker;
     }
 
     public synchronized State getState() {
         return state;
     }
 
-    @SuppressWarnings("incomplete-switch")
-    public synchronized void setState(State state) throws HyracksDataException {
-        if (this.state.equals(state)) {
-            return;
-        }
-        switch (state) {
-            case INACTIVE_INGESTION:
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Set " + Mode.STORE + " mode");
-                }
-                adapterExecutor.getWriter().setMode(Mode.STORE);
-                break;
-            case ACTIVE_INGESTION:
-                adapterExecutor.getWriter().setMode(Mode.FORWARD);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Set " + Mode.FORWARD + " mode");
-                }
-                break;
-        }
+    public synchronized void setState(State state) {
         this.state = state;
     }
 
@@ -194,6 +107,7 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
         return adapterExecutor;
     }
 
+    @Override
     public int getPartition() {
         return partition;
     }
@@ -202,4 +116,9 @@ public class AdapterRuntimeManager implements IAdapterExecutor {
         return ingestionRuntime;
     }
 
+    @Override
+    public IIntakeProgressTracker getProgressTracker() {
+        return tracker;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 3bd73a3..3d29aeb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -17,6 +17,7 @@ package edu.uci.ics.asterix.metadata.feeds;
 import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 
@@ -26,16 +27,22 @@ public class BuiltinFeedPolicies {
 
     public static final FeedPolicy BASIC = initializeBasicPolicy();
 
-    public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+    public static final FeedPolicy BASIC_FT = initializeBasicFTPolicy();
 
-    public static final FeedPolicy FAULT_TOLERANT_BASIC_MONITORED = initializeFaultTolerantBasicMonitoredPolicy();
+    public static final FeedPolicy ADVANCED_FT = initializeAdvancedFTPolicy();
 
-    public static final FeedPolicy ELASTIC = initializeFaultTolerantBasicMonitoredElasticPolicy();
+    public static final FeedPolicy ADVANCED_FT_DISCARD = initializeAdvancedFTDiscardPolicy();
 
-    public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
-            FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
+    public static final FeedPolicy ADVANCED_FT_SPILL = initializeAdvancedFTSpillPolicy();
 
-    public static final FeedPolicy DEFAULT_POLICY = BASIC;
+    public static final FeedPolicy ADVANCED_FT_THROTTLE = initializeAdvancedFTThrottlePolicy();
+
+    public static final FeedPolicy ELASTIC = initializeAdvancedFTElasticPolicy();
+
+    public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_FT, ADVANCED_FT,
+            ADVANCED_FT_DISCARD, ADVANCED_FT_SPILL, ADVANCED_FT_THROTTLE, ELASTIC };
+
+    public static final FeedPolicy DEFAULT_POLICY = BASIC_FT;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
@@ -48,79 +55,134 @@ public class BuiltinFeedPolicies {
         return null;
     }
 
-    // BMFE
-    private static FeedPolicy initializeFaultTolerantBasicMonitoredElasticPolicy() {
+    //Brittle
+    private static FeedPolicy initializeBrittlePolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
-        String description = "Basic Monitored Fault-Tolerant Elastic";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMFE", description, policyParams);
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "false");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+
+        String description = "Brittle";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Brittle", description, policyParams);
     }
 
-    //BMF
-    private static FeedPolicy initializeFaultTolerantBasicMonitoredPolicy() {
+    //Basic
+    private static FeedPolicy initializeBasicPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
         policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "Basic Monitored Fault-Tolerant";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMF", description, policyParams);
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+
+        String description = "Basic";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
     }
 
-    //BM
-    private static FeedPolicy initializeBasicMonitoredPolicy() {
+    // BasicFT
+    private static FeedPolicy initializeBasicFTPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
         policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
         policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "false");
+        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "1");
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "false");
+        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "false");
+
         String description = "Basic Monitored Fault-Tolerant";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BM", description, policyParams);
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicFT", description, policyParams);
     }
 
-    //B
-    private static FeedPolicy initializeBasicPolicy() {
+    // AdvancedFT
+    private static FeedPolicy initializeAdvancedFTPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
         policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "Basic";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "B", description, policyParams);
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
+        policyParams.put(FeedPolicyAccessor.AT_LEAST_ONE_SEMANTICS, "true");
+
+        String description = "Basic Monitored Fault-Tolerant with at least once semantics";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT", description, policyParams);
     }
 
-    //Br
-    private static FeedPolicy initializeBrittlePolicy() {
+    // AdvancedFT_Discard
+    private static FeedPolicy initializeAdvancedFTDiscardPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
-        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
-        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "Brittle";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Br", description, policyParams);
+        policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "false");
+        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "100");
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
+       
+        String description = "AdvancedFT 100% Discard during congestion";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Discard", description,
+                policyParams);
+    }
+
+    // AdvancedFT_Spill
+    private static FeedPolicy initializeAdvancedFTSpillPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.TRUE);
+        policyParams.put(FeedPolicyAccessor.MAX_SPILL_SIZE_ON_DISK, "" + FeedPolicyAccessor.NO_LIMIT);
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "true");
+
+        String description = "AdvancedFT 100% Discard during congestion";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Spill", description, policyParams);
+    }
+
+    // AdvancedFT_Spill
+    private static FeedPolicy initializeAdvancedFTThrottlePolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        policyParams.put(FeedPolicyAccessor.SPILL_TO_DISK_ON_CONGESTION, "" + Boolean.FALSE);
+        policyParams.put(FeedPolicyAccessor.MAX_FRACTION_DISCARD, "" + 0);
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.THROTTLING_ENABLED, "true");
+
+        String description = "AdvancedFT Throttle during congestion";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Throttle", description,
+                policyParams);
+    }
+
+    // AdvancedFT_Elastic
+    private static FeedPolicy initializeAdvancedFTElasticPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.SOFT_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
+        policyParams.put(FeedPolicyAccessor.TIME_TRACKING, "false");
+        policyParams.put(FeedPolicyAccessor.LOGGING_STATISTICS, "true");
+
+        String description = "Basic Monitored Fault-Tolerant Elastic";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "AdvancedFT_Elastic", description,
+                policyParams);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
new file mode 100644
index 0000000..398de35
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/CollectTransformFeedFrameWriter.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedOperatorOutputSideHandler;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class CollectTransformFeedFrameWriter implements IFeedOperatorOutputSideHandler {
+
+    private final FeedConnectionId connectionId;
+    private IFrameWriter downstreamWriter;
+    private final FrameTupleAccessor inputFrameTupleAccessor;
+    private final FrameTupleAppender tupleAppender;
+    private final IFrame frame;
+
+    private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+
+    public CollectTransformFeedFrameWriter(IHyracksTaskContext ctx, IFrameWriter downstreamWriter,
+            ISubscribableRuntime sourceRuntime, RecordDescriptor outputRecordDescriptor, FeedConnectionId connectionId)
+            throws HyracksDataException {
+        this.downstreamWriter = downstreamWriter;
+        RecordDescriptor inputRecordDescriptor = sourceRuntime.getRecordDescriptor();
+        inputFrameTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
+        tupleAppender = new FrameTupleAppender();
+        frame = new VSizeFrame(ctx);
+        tupleAppender.reset(frame, true);
+        this.connectionId = connectionId;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        downstreamWriter.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        inputFrameTupleAccessor.reset(buffer);
+        int nTuple = inputFrameTupleAccessor.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            tupleBuilder.addField(inputFrameTupleAccessor, t, 0);
+            appendTupleToFrame();
+            tupleBuilder.reset();
+        }
+    }
+
+    private void appendTupleToFrame() throws HyracksDataException {
+        if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                tupleBuilder.getSize())) {
+            FrameUtils.flushFrame(frame.getBuffer(), downstreamWriter);
+            tupleAppender.reset(frame, true);
+            if (!tupleAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                    tupleBuilder.getSize())) {
+                throw new IllegalStateException();
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        downstreamWriter.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        downstreamWriter.close();
+    }
+
+    @Override
+    public FeedId getFeedId() {
+        return connectionId.getFeedId();
+    }
+
+    @Override
+    public Type getType() {
+        return Type.COLLECT_TRANSFORM_FEED_OUTPUT_HANDLER;
+    }
+
+    public IFrameWriter getDownstreamWriter() {
+        return downstreamWriter;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void reset(IFrameWriter writer) {
+        this.downstreamWriter = writer;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
deleted file mode 100644
index b71be24..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.logging.Level;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
-import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
-import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class ConditionalPushTupleParserFactory implements ITupleParserFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
-        IDataParser dataParser = null;
-        switch (parserType) {
-            case ADM:
-                dataParser = new ADMDataParser();
-                break;
-            case DELIMITED_DATA:
-                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter, quote, hasHeader);
-                break;
-        }
-        return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
-    }
-
-    private final ARecordType recordType;
-    private final Map<String, String> configuration;
-    private IValueParserFactory[] valueParserFactories;
-    private char delimiter;
-    private char quote;
-    private boolean hasHeader;
-    private final ParserType parserType;
-
-    public enum ParserType {
-        ADM,
-        DELIMITED_DATA
-    }
-
-    public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, char quote, boolean hasHeader, Map<String, String> configuration) {
-        this.recordType = recordType;
-        this.valueParserFactories = valueParserFactories;
-        this.delimiter = fieldDelimiter;
-        this.quote = quote;
-        this.hasHeader = hasHeader;
-        this.configuration = configuration;
-        this.parserType = ParserType.DELIMITED_DATA;
-    }
-
-    public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
-        this.recordType = recordType;
-        this.configuration = configuration;
-        this.parserType = ParserType.ADM;
-    }
-}
-
-class ConditionalPushTupleParser extends AbstractTupleParser {
-
-    private final IDataParser dataParser;
-    private int batchSize;
-    private long batchInterval;
-    private boolean continueIngestion = true;
-    private int tuplesInFrame = 0;
-    private TimeBasedFlushTask flushTask;
-    private Timer timer = new Timer();
-    private Object lock = new Object();
-    private boolean activeTimer = false;
-
-    public static final String BATCH_SIZE = "batch-size";
-    public static final String BATCH_INTERVAL = "batch-interval";
-
-    public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) throws HyracksDataException {
-        super(ctx, recType);
-        this.dataParser = dataParser;
-        String propValue = (String) configuration.get(BATCH_SIZE);
-        batchSize = propValue != null ? Integer.parseInt(propValue) : Integer.MAX_VALUE;
-        propValue = (String) configuration.get(BATCH_INTERVAL);
-        batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
-        activeTimer = batchInterval > 0;
-    }
-
-    public void stop() {
-        continueIngestion = false;
-    }
-
-    @Override
-    public IDataParser getDataParser() {
-        return dataParser;
-    }
-
-    @Override
-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-        flushTask = new TimeBasedFlushTask(writer, lock);
-        IDataParser parser = getDataParser();
-        try {
-            parser.initialize(in, recType, true);
-            if (activeTimer) {
-                timer.schedule(flushTask, 0, batchInterval);
-            }
-            while (continueIngestion) {
-                tb.reset();
-                if (!parser.parse(tb.getDataOutput())) {
-                    break;
-                }
-                tb.addFieldEndOffset();
-                addTuple(writer);
-            }
-            if (appender.getTupleCount() > 0) {
-                if (activeTimer) {
-                    synchronized (lock) {
-                        appender.flush(writer, true);
-                    }
-                } else {
-                    appender.flush(writer, true);
-                }
-            }
-        } catch (AsterixException ae) {
-            throw new HyracksDataException(ae);
-        } catch (IOException ioe) {
-            throw new HyracksDataException(ioe);
-        } finally {
-            if (activeTimer) {
-                timer.cancel();
-            }
-        }
-    }
-
-    protected void addTuple(IFrameWriter writer) throws HyracksDataException {
-        if (activeTimer) {
-            synchronized (lock) {
-                addTupleToFrame(writer);
-            }
-        } else {
-            addTupleToFrame(writer);
-        }
-    }
-
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException();
-            }
-            if (tuplesInFrame == batchSize) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Batch size exceeded! flushing frame " + "(" + tuplesInFrame + ")");
-                }
-            }
-            tuplesInFrame = 0;
-        }
-        tuplesInFrame++;
-    }
-
-    private class TimeBasedFlushTask extends TimerTask {
-
-        private IFrameWriter writer;
-        private final Object lock;
-
-        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
-            this.writer = writer;
-            this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (tuplesInFrame > 0) {
-                    synchronized (lock) {
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
-                        }
-                        appender.flush(writer, true);
-                        tuplesInFrame = 0;
-                    }
-                }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index 06de1e0..536c702 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
new file mode 100644
index 0000000..b67b813
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedCollectOperatorDescriptor.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.IngestionRuntime;
+import edu.uci.ics.asterix.common.feeds.SubscribableFeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.api.IFeedSubscriptionManager;
+import edu.uci.ics.asterix.common.feeds.api.ISubscribableRuntime;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * FeedCollectOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
+ */
+public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(FeedCollectOperatorDescriptor.class.getName());
+
+    /** The type associated with the ADM data output from the feed adaptor */
+    private final IAType outputType;
+
+    /** unique identifier for a feed instance. */
+    private final FeedConnectionId connectionId;
+
+    /** Map representation of policy parameters */
+    private final Map<String, String> feedPolicyProperties;
+
+    /** The (singleton) instance of {@code IFeedIngestionManager} **/
+    private IFeedSubscriptionManager subscriptionManager;
+
+    /** The source feed from which the feed derives its data from. **/
+    private final FeedId sourceFeedId;
+
+    /** The subscription location at which the recipient feed receives tuples from the source feed **/
+    private final ConnectionLocation subscriptionLocation;
+
+    public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, FeedId sourceFeedId,
+            ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
+            ConnectionLocation subscriptionLocation) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.outputType = atype;
+        this.connectionId = feedConnectionId;
+        this.feedPolicyProperties = feedPolicyProperties;
+        this.sourceFeedId = sourceFeedId;
+        this.subscriptionLocation = subscriptionLocation;
+    }
+
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
+        this.subscriptionManager = runtimeCtx.getFeedManager().getFeedSubscriptionManager();
+        ISubscribableRuntime sourceRuntime = null;
+        IOperatorNodePushable nodePushable = null;
+        switch (subscriptionLocation) {
+            case SOURCE_FEED_INTAKE_STAGE:
+                try {
+                    SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                            FeedRuntimeType.INTAKE, partition);
+                    sourceRuntime = getIntakeRuntime(feedSubscribableRuntimeId);
+                    if (sourceRuntime == null) {
+                        throw new HyracksDataException("Source intake task not found for source feed id "
+                                + sourceFeedId);
+                    }
+                    nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+                            feedPolicyProperties, partition, nPartitions, sourceRuntime);
+
+                } catch (Exception exception) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
+                    }
+                    throw new HyracksDataException("Initialization of the feed adapter failed", exception);
+                }
+                break;
+            case SOURCE_FEED_COMPUTE_STAGE:
+                SubscribableFeedRuntimeId feedSubscribableRuntimeId = new SubscribableFeedRuntimeId(sourceFeedId,
+                        FeedRuntimeType.COMPUTE, partition);
+                sourceRuntime = (ISubscribableRuntime) subscriptionManager
+                        .getSubscribableRuntime(feedSubscribableRuntimeId);
+                if (sourceRuntime == null) {
+                    throw new HyracksDataException("Source compute task not found for source feed id " + sourceFeedId
+                            + " " + FeedRuntimeType.COMPUTE + "[" + partition + "]");
+                }
+                nodePushable = new FeedCollectOperatorNodePushable(ctx, sourceFeedId, connectionId,
+                        feedPolicyProperties, partition, nPartitions, sourceRuntime);
+                break;
+        }
+        return nodePushable;
+    }
+
+    public FeedConnectionId getFeedConnectionId() {
+        return connectionId;
+    }
+
+    public Map<String, String> getFeedPolicyProperties() {
+        return feedPolicyProperties;
+    }
+
+    public IAType getOutputType() {
+        return outputType;
+    }
+
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptors[0];
+    }
+
+    public FeedId getSourceFeedId() {
+        return sourceFeedId;
+    }
+
+    private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
+        int waitCycleCount = 0;
+        ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+        while (ingestionRuntime == null && waitCycleCount < 10) {
+            try {
+                Thread.sleep(2000);
+                waitCycleCount++;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                break;
+            }
+            ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
+        }
+        return (IngestionRuntime) ingestionRuntime;
+    }
+
+    public ConnectionLocation getSubscriptionLocation() {
+        return subscriptionLocation;
+    }
+}