You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:00 UTC

[03/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index c7b905d..ebee738 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -21,35 +21,47 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -61,7 +73,11 @@ import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
 /**
  * A utility class for providing helper functions for feeds
@@ -70,9 +86,17 @@ public class FeedUtil {
 
     private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
 
-    public static boolean isFeedActive(FeedActivity feedActivity) {
-        return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_FAILURE) || feedActivity
-                .getActivityType().equals(FeedActivityType.FEED_END)));
+    public static String getFeedPointKeyRep(Feed feed, List<String> appliedFunctions) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(feed.getDataverseName() + ":");
+        builder.append(feed.getFeedName() + ":");
+        if (appliedFunctions != null && !appliedFunctions.isEmpty()) {
+            for (String function : appliedFunctions) {
+                builder.append(function + ":");
+            }
+            builder.deleteCharAt(builder.length() - 1);
+        }
+        return builder.toString();
     }
 
     private static class LocationConstraint {
@@ -80,62 +104,113 @@ public class FeedUtil {
         String location;
     }
 
+    public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
+            throws AsterixException {
+        Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
+        if (dataset == null) {
+            throw new AsterixException("Unknown target dataset :" + datasetName);
+        }
+
+        if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
+            throw new AsterixException("Statement not applicable. Dataset " + datasetName + " is not of required type "
+                    + DatasetType.INTERNAL);
+        }
+        return dataset;
+    }
+
+    public static Feed validateIfFeedExists(String dataverse, String feedName, MetadataTransactionContext ctx)
+            throws MetadataException, AsterixException {
+        Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverse, feedName);
+        if (feed == null) {
+            throw new AsterixException("Unknown source feed: " + feedName);
+        }
+        return feed;
+    }
+
+    public static FeedPolicy validateIfPolicyExists(String dataverse, String policyName, MetadataTransactionContext ctx)
+            throws AsterixException {
+        FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
+        if (feedPolicy == null) {
+            feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    policyName);
+            if (feedPolicy == null) {
+                throw new AsterixException("Unknown feed policy" + policyName);
+            }
+        }
+        return feedPolicy;
+    }
+
     public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
-            FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
+            FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) {
 
-        FeedPolicyAccessor fpa = new FeedPolicyAccessor(feedPolicy.getProperties());
-        boolean alterationRequired = (fpa.collectStatistics() || fpa.continueOnApplicationFailure()
-                || fpa.continueOnHardwareFailure() || fpa.isElastic());
-        if (!alterationRequired) {
-            return spec;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Original Job Spec:" + spec);
         }
 
         JobSpecification altered = new JobSpecification(spec.getFrameSize());
         Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-
+        boolean preProcessingRequired = preProcessingRequired(feedConnectionId);
         // copy operators
-        String operationId = null;
+        String operandId = null;
         Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+        FeedMetaOperatorDescriptor metaOp = null;
         for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
-            operationId = FeedRuntime.FeedRuntimeId.DEFAULT_OPERATION_ID;
+            operandId = FeedRuntimeId.DEFAULT_OPERAND_ID;
             IOperatorDescriptor opDesc = entry.getValue();
-            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
-                FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
-                FeedIntakeOperatorDescriptor fiop;
-                if (orig.getAdapterFactory() != null) {
-                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
-                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
-                } else {
-                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
-                            orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
-                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
-                }
+            if (opDesc instanceof FeedCollectOperatorDescriptor) {
+                FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc;
+                FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered,
+                        orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(),
+                        orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
                 oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
             } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
-                operationId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
-                        feedPolicy, FeedRuntimeType.STORAGE, operationId);
+                operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+                        FeedRuntimeType.STORE, false, operandId);
                 oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
             } else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
-                operationId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
-                        feedPolicy, FeedRuntimeType.STORAGE, operationId);
+                operandId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+                metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+                        FeedRuntimeType.STORE, false, operandId);
                 oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+
             } else {
                 FeedRuntimeType runtimeType = null;
+                boolean enableSubscriptionMode = false;
+                boolean createMetaOp = true;
+                OperatorDescriptorId opId = null;
                 if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                     IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
                             .getRuntimeFactories()[0];
                     if (runtimeFactory instanceof AssignRuntimeFactory) {
-                        runtimeType = FeedRuntimeType.COMPUTE;
-                    } else if (runtimeFactory instanceof StreamProjectRuntimeFactory) {
-                        runtimeType = FeedRuntimeType.COMMIT;
+                        IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId())
+                                .get(0);
+                        IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
+                        if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+                            runtimeType = preProcessingRequired ? FeedRuntimeType.COMPUTE : FeedRuntimeType.OTHER;
+                            enableSubscriptionMode = preProcessingRequired;
+                        } else {
+                            runtimeType = FeedRuntimeType.OTHER;
+                        }
+                    } else if (runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) {
+                        runtimeType = FeedRuntimeType.ETS;
+                    } else {
+                        runtimeType = FeedRuntimeType.OTHER;
+                    }
+                } else {
+                    if (opDesc instanceof AbstractSingleActivityOperatorDescriptor) {
+                        runtimeType = FeedRuntimeType.OTHER;
+                    } else {
+                        opId = altered.createOperatorDescriptorId(opDesc);
+                        createMetaOp = false;
                     }
                 }
-                FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
-                        feedPolicy, runtimeType, operationId);
-
-                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+                if (createMetaOp) {
+                    metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+                            runtimeType, enableSubscriptionMode, operandId);
+                    opId = metaOp.getOperatorId();
+                }
+                oldNewOID.put(opDesc.getOperatorId(), opId);
             }
         }
 
@@ -240,35 +315,181 @@ public class FeedUtil {
 
     }
 
-    public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
-            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+    public static void increaseCardinality(JobSpecification spec, FeedRuntimeType compute, int requiredCardinality,
+            List<String> newLocations) throws AsterixException {
+        IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc,
+                nChooseK(requiredCardinality, newLocations));
+
+    }
+
+    public static void decreaseComputeCardinality(JobSpecification spec, FeedRuntimeType compute,
+            int requiredCardinality, List<String> currentLocations) throws AsterixException {
+        IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+        String[] chosenLocations = nChooseK(requiredCardinality, currentLocations);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc, chosenLocations);
+    }
+
+    private static IOperatorDescriptor alterJobSpecForComputeCardinality(JobSpecification spec, int requiredCardinality)
+            throws AsterixException {
+        Map<ConnectorDescriptorId, IConnectorDescriptor> connectors = spec.getConnectorMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
+                .getConnectorOperatorMap();
+
+        IOperatorDescriptor sourceOp = null;
+        IOperatorDescriptor targetOp = null;
+        IConnectorDescriptor connDesc = null;
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
+                .entrySet()) {
+            ConnectorDescriptorId cid = entry.getKey();
+            sourceOp = entry.getValue().getKey().getKey();
+            if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+                targetOp = entry.getValue().getValue().getKey();
+                if (targetOp instanceof FeedMetaOperatorDescriptor
+                        && (((FeedMetaOperatorDescriptor) targetOp).getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
+                    connDesc = connectors.get(cid);
+                    break;
+                } else {
+                    throw new AsterixException("Incorrect manipulation, feed does not have a compute stage");
+                }
+            }
+        }
+
+        Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorInputMap = spec.getOperatorInputMap();
+        boolean removed = operatorInputMap.get(targetOp.getOperatorId()).remove(connDesc);
+        if (!removed) {
+            throw new AsterixException("Connector desc not found");
+        }
+        Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorOutputMap = spec.getOperatorOutputMap();
+        removed = operatorOutputMap.get(sourceOp.getOperatorId()).remove(connDesc);
+        if (!removed) {
+            throw new AsterixException("Connector desc not found");
+        }
+        spec.getConnectorMap().remove(connDesc.getConnectorId());
+        connectorOpMap.remove(connDesc.getConnectorId());
+
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(requiredCardinality);
+        MToNPartitioningConnectorDescriptor newConnector = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+        spec.getConnectorMap().put(newConnector.getConnectorId(), newConnector);
+        spec.connect(newConnector, sourceOp, 0, targetOp, 0);
+
+        // ==============================================================================
+        Set<Constraint> userConstraints = spec.getUserConstraints();
+        Constraint countConstraint = null;
+        Constraint locationConstraint = null;
+        List<LocationConstraint> locations = new ArrayList<LocationConstraint>();
+        IOperatorDescriptor changingOpDesc = null;
+
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            OperatorDescriptorId opId;
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT: {
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+                    if (opDesc instanceof FeedMetaOperatorDescriptor) {
+                        FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+                        if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+                            countConstraint = constraint;
+                            changingOpDesc = opDesc;
+                        }
+                    }
+                    break;
+                }
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+                    if (opDesc instanceof FeedMetaOperatorDescriptor) {
+                        FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+                        if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+                            locationConstraint = constraint;
+                            changingOpDesc = opDesc;
+                            String location = (String) ((ConstantExpression) cexpr).getValue();
+                            LocationConstraint lc = new LocationConstraint();
+                            lc.location = location;
+                            lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
+                            locations.add(lc);
+                        }
+                    }
+
+                    break;
+            }
+        }
+
+        userConstraints.remove(countConstraint);
+        if (locationConstraint != null) {
+            userConstraints.remove(locationConstraint);
+        }
+
+        return changingOpDesc;
+    }
+
+    private static String[] nChooseK(int k, List<String> locations) {
+        String[] result = new String[k];
+        for (int i = 0; i < k; i++) {
+            result[i] = locations.get(i);
+        }
+        return result;
+    }
+
+    private static boolean preProcessingRequired(FeedConnectionId connectionId) {
+        MetadataTransactionContext ctx = null;
+        Feed feed = null;
+        boolean preProcessingRequired = false;
+        try {
+            MetadataManager.INSTANCE.acquireReadLatch();
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(), connectionId
+                    .getFeedId().getFeedName());
+            preProcessingRequired = feed.getAppliedFunction() != null;
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            if (ctx != null) {
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception abortException) {
+                    e.addSuppressed(abortException);
+                    throw new IllegalStateException(e);
+                }
+            }
+        } finally {
+            MetadataManager.INSTANCE.releaseReadLatch();
+        }
+        return preProcessingRequired;
+    }
+
+    public static Triple<IFeedAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(
+            PrimaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
+            throws AlgebricksException {
 
         String adapterName = null;
         DatasourceAdapter adapterEntity = null;
         String adapterFactoryClassname = null;
-        IAdapterFactory adapterFactory = null;
+        IFeedAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
-        Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
+        Triple<IFeedAdapterFactory, ARecordType, AdapterType> feedProps = null;
+        AdapterType adapterType = null;
         try {
-            adapterName = feed.getAdapterName();
+            adapterName = feed.getAdaptorName();
             adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
                     adapterName);
             if (adapterEntity == null) {
                 adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
             }
-
             if (adapterEntity != null) {
+                adapterType = adapterEntity.getType();
                 adapterFactoryClassname = adapterEntity.getClassname();
-                switch (adapterEntity.getType()) {
+                switch (adapterType) {
                     case INTERNAL:
-                        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                        adapterFactory = (IFeedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
                         break;
                     case EXTERNAL:
                         String[] anameComponents = adapterName.split("#");
                         String libraryName = anameComponents[0];
                         ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(),
                                 libraryName);
-                        adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        adapterFactory = (IFeedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
                 }
             } else {
@@ -276,42 +497,90 @@ public class FeedUtil {
                 if (adapterFactoryClassname == null) {
                     adapterFactoryClassname = adapterName;
                 }
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            }
-
-            Map<String, String> configuration = feed.getAdapterConfiguration();
-
-            switch (adapterFactory.getAdapterType()) {
-                case TYPED:
-                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
-                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
-                    break;
-                case GENERIC:
-                    String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
-                    if (outputTypeName == null) {
-                        throw new IllegalArgumentException(
-                                "You must specify the datatype associated with the incoming data. Datatype is specified by the "
-                                        + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
-                    }
-                    Datatype datatype = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, feed.getDataverseName(),
-                            outputTypeName);
-                    if (datatype == null) {
-                        throw new Exception("no datatype \"" + outputTypeName + "\" in dataverse \""
-                                + feed.getDataverseName() + "\"");
-                    }
-                    adapterOutputType = (ARecordType) datatype.getDatatype();
-                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
-                    break;
-                default:
-                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+                adapterFactory = (IFeedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                adapterType = AdapterType.INTERNAL;
             }
 
-            feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
-                    adapterEntity.getType());
+            Map<String, String> configuration = feed.getAdaptorConfiguration();
+            configuration.putAll(policyAccessor.getFeedPolicy());
+            adapterOutputType = getOutputType(feed, configuration);
+            adapterFactory.configure(configuration, adapterOutputType);
+            feedProps = new Triple<IFeedAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
+                    adapterType);
         } catch (Exception e) {
             e.printStackTrace();
-            throw new AlgebricksException("unable to create adapter  " + e);
+            throw new AlgebricksException("unable to create adapter " + e);
         }
         return feedProps;
     }
+
+    private static ARecordType getOutputType(PrimaryFeed feed, Map<String, String> configuration) throws Exception {
+        ARecordType outputType = null;
+        String fqOutputType = configuration.get(IAdapterFactory.KEY_TYPE_NAME);
+
+        if (fqOutputType == null) {
+            throw new IllegalArgumentException("No output type specified");
+        }
+        String[] dataverseAndType = fqOutputType.split("[.]");
+        String dataverseName;
+        String datatypeName;
+
+        if (dataverseAndType.length == 1) {
+            datatypeName = dataverseAndType[0];
+            dataverseName = feed.getDataverseName();
+        } else if (dataverseAndType.length == 2) {
+            dataverseName = dataverseAndType[0];
+            datatypeName = dataverseAndType[1];
+        } else
+            throw new IllegalArgumentException("Invalid value for the parameter " + IAdapterFactory.KEY_TYPE_NAME);
+
+        MetadataTransactionContext ctx = null;
+        MetadataManager.INSTANCE.acquireReadLatch();
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
+            IAType type = t.getDatatype();
+            if (type.getTypeTag() != ATypeTag.RECORD) {
+                throw new IllegalStateException();
+            }
+            outputType = (ARecordType) t.getDatatype();
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            if (ctx != null) {
+                MetadataManager.INSTANCE.abortTransaction(ctx);
+            }
+            throw e;
+        } finally {
+            MetadataManager.INSTANCE.releaseReadLatch();
+        }
+        return outputType;
+    }
+
+    public static String getSecondaryFeedOutput(SecondaryFeed feed, FeedPolicyAccessor policyAccessor,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+        String outputType = null;
+        String primaryFeedName = feed.getSourceFeedName();
+        Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
+        FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
+        if (appliedFunction == null) {
+            Triple<IFeedAdapterFactory, ARecordType, AdapterType> result = getPrimaryFeedFactoryAndOutput(
+                    (PrimaryFeed) primaryFeed, policyAccessor, mdTxnCtx);
+            outputType = result.second.getTypeName();
+        } else {
+            Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+            if (function != null) {
+                if (function.getLanguage().equals(Function.LANGUAGE_AQL)) {
+                    throw new NotImplementedException(
+                            "Secondary feeds derived from a source feed that has an applied AQL function are not supported yet.");
+                } else {
+                    outputType = function.getReturnType();
+                }
+            } else {
+                throw new IllegalArgumentException("Function " + appliedFunction
+                        + " associated with source feed not found in Metadata.");
+            }
+        }
+        return outputType;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
new file mode 100644
index 0000000..731ca56
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedWork;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWorkEventListener;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWorkManager;
+
+/**
+ * Handles asynchronous execution of feed management related tasks.
+ */
+public class FeedWorkManager implements IFeedWorkManager {
+
+    public static final FeedWorkManager INSTANCE = new FeedWorkManager();
+
+    private final ExecutorService executorService = Executors.newCachedThreadPool();
+
+    private FeedWorkManager() {
+    }
+
+    public void submitWork(IFeedWork work, IFeedWorkEventListener listener) {
+        Runnable runnable = work.getRunnable();
+        try {
+            executorService.execute(runnable);
+            listener.workCompleted(work);
+        } catch (Exception e) {
+            listener.workFailed(work, e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
deleted file mode 100644
index f0a3aa4..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
- * Acts as a marker interface indicating that the implementation provides functionality
- * for creating an adapter.
- */
-public interface IAdapterFactory extends Serializable {
-
-    /**
-     * A 'GENERIC' adapter can be configured to return a given datatype.
-     * A 'TYPED' adapter returns records with a pre-defined datatype.
-     */
-    public enum AdapterType {
-        GENERIC,
-        TYPED
-    }
-
-    public enum SupportedOperation {
-        READ,
-        WRITE,
-        READ_WRITE
-    }
-
-    /**
-     * Returns the type of adapter indicating if the adapter can be used for
-     * reading from an external data source or writing to an external data
-     * source or can be used for both purposes.
-     * 
-     * @see SupportedOperation
-     * @return
-     */
-    public SupportedOperation getSupportedOperations();
-
-    /**
-     * Returns the display name corresponding to the Adapter type that is created by the factory.
-     * 
-     * @return the display name
-     */
-    public String getName();
-
-    /**
-     * Returns the type of the adapter (GENERIC or TYPED)
-     * 
-     * @return
-     */
-    public AdapterType getAdapterType();
-
-    /**
-     * Returns a list of partition constraints. A partition constraint can be a
-     * requirement to execute at a particular location or could be cardinality
-     * constraints indicating the number of instances that need to run in
-     * parallel. example, a IDatasourceAdapter implementation written for data
-     * residing on the local file system of a node cannot run on any other node
-     * and thus has a location partition constraint. The location partition
-     * constraint can be expressed as a node IP address or a node controller id.
-     * In the former case, the IP address is translated to a node controller id
-     * running on the node with the given IP address.
-     */
-    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
-    /**
-     * Creates an instance of IDatasourceAdapter.
-     * 
-     * @param HyracksTaskContext
-     * @param partition
-     * @return An instance of IDatasourceAdapter.
-     * @throws Exception
-     */
-    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
deleted file mode 100644
index a4c5de9..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-
-/**
- * A super interface implemented by a data source adapter. An adapter can be a
- * pull based or push based. This interface provides all common APIs that need
- * to be implemented by each adapter irrespective of the the kind of
- * adapter(pull or push).
- */
-public interface IDatasourceAdapter extends Serializable {
-
-    /**
-     * An adapter can be used to read from an external data source and may also
-     * allow writing to the external data source. This enum type indicates the
-     * kind of operations supported by the adapter.
-     */
-
-    /**
-     * Triggers the adapter to begin ingesting data from the external source.
-     * 
-     * @param partition
-     *            The adapter could be running with a degree of parallelism.
-     *            partition corresponds to the i'th parallel instance.
-     * @param writer
-     *            The instance of frame writer that is used by the adapter to
-     *            write frame to. Adapter packs the fetched bytes (from external source),
-     *            packs them into frames and forwards the frames to an upstream receiving
-     *            operator using the instance of IFrameWriter.
-     * @throws Exception
-     */
-    public void start(int partition, IFrameWriter writer) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
deleted file mode 100644
index 55abd73..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-/**
- * Interface implemented by an adapter that can be controlled or managed by external
- * commands (stop,alter)
- */
-public interface IFeedAdapter extends IDatasourceAdapter {
-
-    public enum DataExchangeMode {
-        PULL,
-        PUSH
-    }
-
-    /**
-     * @return
-     */
-    public DataExchangeMode getDataExchangeMode();
-
-    /**
-     * Discontinue the ingestion of data and end the feed.
-     * 
-     * @throws Exception
-     */
-    public void stop() throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
new file mode 100644
index 0000000..fc6bfe7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+
+public interface IFeedAdapterFactory extends IAdapterFactory {
+
+    public boolean isRecordTrackingEnabled();
+
+    public IIntakeProgressTracker createIntakeProgressTracker();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
deleted file mode 100644
index 47aa8d9..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-
-public interface IGenericAdapterFactory extends IAdapterFactory {
-
-    public static final String KEY_TYPE_NAME = "type-name";
-
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
-
-    public void setFiles(List<ExternalFile> files) throws AlgebricksException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
index 50641b0..dbc16d6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
@@ -14,6 +14,8 @@ x * Copyright 2009-2013 by The Regents of the University of California
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+
 public interface IPullBasedFeedAdapter extends IFeedAdapter {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
index 6faa44b..de2086e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -16,6 +16,7 @@ package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 
 public interface ITypedAdapterFactory extends IAdapterFactory {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
deleted file mode 100644
index 6cfdc7f..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-
-public class IngestionRuntime extends FeedRuntime {
-
-    private AdapterRuntimeManager adapterRuntimeManager;
-
-    public IngestionRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType,
-            AdapterRuntimeManager adapterRuntimeManager) {
-        super(feedId, partition, feedRuntimeType);
-        this.adapterRuntimeManager = adapterRuntimeManager;
-    }
-
-    public AdapterRuntimeManager getAdapterRuntimeManager() {
-        return adapterRuntimeManager;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
new file mode 100644
index 0000000..8c6d54d
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class PrepareStallMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    private final int computePartitionsRetainLimit;
+
+    public PrepareStallMessage(FeedConnectionId connectionId, int computePartitionsRetainLimit) {
+        super(MessageType.PREPARE_STALL);
+        this.connectionId = connectionId;
+        this.computePartitionsRetainLimit = computePartitionsRetainLimit;
+    }
+
+    @Override
+    public String toString() {
+        return MessageType.PREPARE_STALL.name() + "  " + connectionId;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.COMPUTE_PARTITION_RETAIN_LIMIT, computePartitionsRetainLimit);
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public int getComputePartitionsRetainLimit() {
+        return computePartitionsRetainLimit;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
index d7e2e2f..404e14b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
@@ -31,8 +31,7 @@ public class RemoteSocketMessageListener {
     private final String host;
     private final int port;
     private final LinkedBlockingQueue<String> outbox;
-
-    private ExecutorService executorService = Executors.newFixedThreadPool(10);
+    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
 
     private RemoteMessageListenerServer listenerServer;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
new file mode 100644
index 0000000..515b78e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+
+/**
+ * Listens for messages at a configured port and redirects them to a
+ * an instance of {@code IMessageReceiver}.
+ * Messages may arrive in parallel from multiple senders. Each sender is handled by
+ * a respective instance of {@code ClientHandler}.
+ */
+public class SocketMessageListener {
+
+    private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
+
+    private final int port;
+    private final IMessageReceiver<String> messageReceiver;
+    private final MessageListenerServer listenerServer;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
+        this.port = port;
+        this.messageReceiver = messageReceiver;
+        this.listenerServer = new MessageListenerServer(port, messageReceiver);
+    }
+
+    public void stop() throws IOException {
+        listenerServer.stop();
+        messageReceiver.close(false);
+        if (!executorService.isShutdown()) {
+            executorService.shutdownNow();
+        }
+    }
+
+    public void start() {
+        messageReceiver.start();
+        executorService.execute(listenerServer);
+    }
+
+    private static class MessageListenerServer implements Runnable {
+
+        private final int port;
+        private final IMessageReceiver<String> messageReceiver;
+        private ServerSocket server;
+        private final Executor executor;
+
+        public MessageListenerServer(int port, IMessageReceiver<String> messageReceiver) {
+            this.port = port;
+            this.messageReceiver = messageReceiver;
+            this.executor = Executors.newCachedThreadPool();
+        }
+
+        public void stop() throws IOException {
+            server.close();
+        }
+
+        @Override
+        public void run() {
+            Socket client = null;
+            try {
+                server = new ServerSocket(port);
+                while (true) {
+                    client = server.accept();
+                    ClientHandler handler = new ClientHandler(client, messageReceiver);
+                    executor.execute(handler);
+                }
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to start Message listener" + server);
+                }
+            } finally {
+                if (server != null) {
+                    try {
+                        server.close();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        private static class ClientHandler implements Runnable {
+
+            private static final char EOL = (char) "\n".getBytes()[0];
+
+            private final Socket client;
+            private final IMessageReceiver<String> messageReceiver;
+
+            public ClientHandler(Socket client, IMessageReceiver<String> messageReceiver) {
+                this.client = client;
+                this.messageReceiver = messageReceiver;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    InputStream in = client.getInputStream();
+                    CharBuffer buffer = CharBuffer.allocate(5000);
+                    char ch;
+                    while (true) {
+                        ch = (char) in.read();
+                        if (((int) ch) == -1) {
+                            break;
+                        }
+                        while (ch != EOL) {
+                            buffer.put(ch);
+                            ch = (char) in.read();
+                        }
+                        buffer.flip();
+                        String s = new String(buffer.array(), 0, buffer.limit());
+                        messageReceiver.sendMessage(s + "\n");
+                        buffer.position(0);
+                        buffer.limit(5000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to process mesages from client" + client);
+                    }
+                } finally {
+                    if (client != null) {
+                        try {
+                            client.close();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
new file mode 100644
index 0000000..c22d2e1
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class TerminateDataFlowMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FeedConnectionId connectionId;
+
+    public TerminateDataFlowMessage(FeedConnectionId connectionId) {
+        super(MessageType.TERMINATE_FLOW);
+        this.connectionId = connectionId;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        return obj;
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
new file mode 100644
index 0000000..e36ea2b
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+/**
+ * A feed control message indicating the need to execute a give AQL.
+ */
+public class XAQLFeedMessage extends FeedMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String aql;
+    private final FeedConnectionId connectionId;
+
+    public XAQLFeedMessage(FeedConnectionId connectionId, String aql) {
+        super(MessageType.XAQL);
+        this.connectionId = connectionId;
+        this.aql = aql;
+    }
+
+    @Override
+    public String toString() {
+        return messageType.name() + " " + connectionId + " [" + aql + "] ";
+    }
+
+    public FeedConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public String getAql() {
+        return aql;
+    }
+
+    @Override
+    public JSONObject toJSON() throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+        obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+        obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+        obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+        obj.put(FeedConstants.MessageConstants.AQL, aql);
+        return obj;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
index 086caf1..8165c9c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -38,8 +38,8 @@ public class MetadataBuiltinFunctions {
         addMetadataBuiltinFunctions();
         AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.DATASET, false);
         AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.DATASET);
-        AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INGEST, false);
-        AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INGEST);
+        AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_COLLECT, false);
+        AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_COLLECT);
         AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INTERCEPT, false);
         AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INTERCEPT);
     }
@@ -89,17 +89,17 @@ public class MetadataBuiltinFunctions {
             }
         }, true);
 
-        AsterixBuiltinFunctions.addPrivateFunction(AsterixBuiltinFunctions.FEED_INGEST, new IResultTypeComputer() {
+        AsterixBuiltinFunctions.addPrivateFunction(AsterixBuiltinFunctions.FEED_COLLECT, new IResultTypeComputer() {
 
             @Override
             public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
                     IMetadataProvider<?, ?> mp) throws AlgebricksException {
                 AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
-                if (f.getArguments().size() != 3) {
-                    throw new AlgebricksException("Incorrect number of arguments -> arity is 3, not "
-                            + f.getArguments().size());
+                if (f.getArguments().size() != AsterixBuiltinFunctions.FEED_COLLECT.getArity()) {
+                    throw new AlgebricksException("Incorrect number of arguments -> arity is "
+                            + AsterixBuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
                 }
-                ILogicalExpression a1 = f.getArguments().get(1).getValue();
+                ILogicalExpression a1 = f.getArguments().get(5).getValue();
                 IAType t1 = (IAType) env.getType(a1);
                 if (t1.getTypeTag() == ATypeTag.ANY) {
                     return BuiltinType.ANY;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
index 436fe4d..e456f89 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -15,6 +15,7 @@ public class MetadataLockManager {
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
 
@@ -24,6 +25,7 @@ public class MetadataLockManager {
         functionsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
         nodeGroupsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
         feedsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        feedPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
         compactionPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
         dataTypeLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
     }
@@ -205,6 +207,19 @@ public class MetadataLockManager {
     public void releaseFeedWriteLock(String feedName) {
         feedsLocks.get(feedName).writeLock().unlock();
     }
+    
+    public void acquireFeedPolicyWriteLock(String policyName) {
+        ReentrantReadWriteLock fLock = feedPolicyLocks.get(policyName);
+        if (fLock == null) {
+            feedPolicyLocks.putIfAbsent(policyName, new ReentrantReadWriteLock());
+            fLock = feedPolicyLocks.get(policyName);
+        }
+        fLock.writeLock().lock();
+    }
+
+    public void releaseFeedPolicyWriteLock(String policyName) {
+        feedPolicyLocks.get(policyName).writeLock().unlock();
+    }
 
     public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
         ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
@@ -411,6 +426,16 @@ public class MetadataLockManager {
         releaseFeedWriteLock(feedFullyQualifiedName);
         releaseDataverseReadLock(dataverseName);
     }
+    
+    public void dropFeedPolicyBegin(String dataverseName, String policyName) {
+        releaseFeedWriteLock(policyName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void dropFeedPolicyEnd(String dataverseName, String policyName) {
+        releaseFeedWriteLock(policyName);
+        releaseDataverseReadLock(dataverseName);
+    }
 
     public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
         acquireDataverseReadLock(dataverseName);
@@ -434,6 +459,16 @@ public class MetadataLockManager {
         releaseDataverseReadLock(dataverseName);
     }
 
+    public void createFeedPolicyBegin(String dataverseName, String policyName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireFeedPolicyWriteLock(policyName);
+    }
+
+    public void createFeedPolicyEnd(String dataverseName, String policyName) {
+        releaseFeedPolicyWriteLock(policyName);
+        releaseDataverseReadLock(dataverseName);
+    }
+    
     public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
             String feedFullyQualifiedName) {
         acquireDataverseReadLock(dataverseName);
@@ -446,6 +481,19 @@ public class MetadataLockManager {
         releaseDatasetReadLock(datasetFullyQualifiedName);
         releaseDataverseReadLock(dataverseName);
     }
+    
+    public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
+            String feedFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetReadLock(datasetFullyQualifiedName);
+        acquireFeedReadLock(feedFullyQualifiedName);
+    }
+    
+    public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+        releaseFeedReadLock(feedFullyQualifiedName);
+        releaseDatasetReadLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
 
     public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
         acquireDataverseReadLock(dataverseName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index cfecd9c..fa99064 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -273,8 +273,8 @@ public class AsterixBuiltinFunctions {
             "string-join", 2);
 
     public final static FunctionIdentifier DATASET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset", 1);
-    public final static FunctionIdentifier FEED_INGEST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
-            "feed-ingest", 3);
+    public final static FunctionIdentifier FEED_COLLECT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "feed-collect", 6);
     public final static FunctionIdentifier FEED_INTERCEPT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "feed-intercept", 1);
 
@@ -1253,14 +1253,14 @@ public class AsterixBuiltinFunctions {
 
     static {
         datasetFunctions.add(getAsterixFunctionInfo(DATASET));
-        datasetFunctions.add(getAsterixFunctionInfo(FEED_INGEST));
+        datasetFunctions.add(getAsterixFunctionInfo(FEED_COLLECT));
         datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
         datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
     }
 
     static {
         addUnnestFun(DATASET, false);
-        addUnnestFun(FEED_INGEST, false);
+        addUnnestFun(FEED_COLLECT, false);
         addUnnestFun(FEED_INTERCEPT, false);
         addUnnestFun(RANGE, true);
         addUnnestFun(SCAN_COLLECTION, false);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
index eb4a027..7d98846 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
@@ -275,6 +275,10 @@ public class ARecordPointable extends AbstractVisitablePointable {
     public List<IVisitablePointable> getFieldValues() {
         return fieldValues;
     }
+    
+    public ARecordType getInputRecordType(){
+        return inputRecType;
+    }
 
     @Override
     public <R, T> R accept(IVisitablePointableVisitor<R, T> vistor, T tag) throws AsterixException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
index cef7937..a24f2de 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
@@ -18,6 +18,7 @@ import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -46,6 +47,7 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
     private AsterixMetadataProperties metadataProperties;
     private AsterixStorageProperties storageProperties;
     private AsterixTransactionProperties txnProperties;
+    private AsterixFeedProperties feedProperties;
 
     private IHyracksClientConnection hcc;
 
@@ -59,6 +61,7 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
         INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
         INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor);
         INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
+        INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor);
         INSTANCE.hcc = hcc;
         Logger.getLogger("edu.uci.ics").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
@@ -102,6 +105,11 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
         return externalProperties;
     }
 
+    @Override
+    public AsterixFeedProperties getFeedProperties() {
+        return feedProperties;
+    }
+    
     public IHyracksClientConnection getHcc() {
         return hcc;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 8e393d7..de3b1b9 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -28,6 +28,7 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -70,12 +71,9 @@ public class AsterixClusterProperties {
         }
     }
 
-    public enum State {
-        ACTIVE,
-        UNUSABLE
-    }
+    
 
-    private State state = State.UNUSABLE;
+    private ClusterState state = ClusterState.UNUSABLE;
 
     public synchronized void removeNCConfiguration(String nodeId) {
         // state = State.UNUSABLE;
@@ -87,7 +85,7 @@ public class AsterixClusterProperties {
         ncConfiguration.put(nodeId, configuration);
         if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
                 .getNodeNames().size()) {
-            state = State.ACTIVE;
+            state = ClusterState.ACTIVE;
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
@@ -129,7 +127,7 @@ public class AsterixClusterProperties {
         return ncConfig.get(IO_DEVICES).split(",");
     }
 
-    public State getState() {
+    public ClusterState getState() {
         return state;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 3871aef..8fdd14c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -309,8 +310,8 @@ import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.PrintTimeDescri
 import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.TimeFromDatetimeDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.TimeFromUnixTimeInMsDescriptor;
 import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.YearMonthDurationComparatorDecriptor;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
 import edu.uci.ics.asterix.runtime.unnestingfunctions.std.RangeDescriptor;
 import edu.uci.ics.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor;
@@ -1081,23 +1082,26 @@ public class NonTaggedDataFormat implements IDataFormat {
     @Override
     public ITupleParserFactory createTupleParser(ARecordType recType, boolean delimitedFormat, char delimiter,
             char quote, boolean hasHeader) {
+        Map<String, String> conf = new HashMap<String, String>();
+        AsterixTupleParserFactory.InputDataFormat inputFormat = null;
         if (delimitedFormat) {
-            int n = recType.getFieldTypes().length;
-            IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
-            for (int i = 0; i < n; i++) {
-                ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
-                IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
-                if (vpf == null) {
-                    throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
-                }
-                fieldParserFactories[i] = vpf;
-            }
-            return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, hasHeader);
+            conf.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT);
+            conf.put(AsterixTupleParserFactory.KEY_DELIMITER, "" + delimiter);
+            inputFormat = InputDataFormat.DELIMITED;
         } else {
-            return new AdmSchemafullRecordParserFactory(recType);
+            conf.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_ADM);
+            inputFormat = InputDataFormat.ADM;
         }
+
+        if (hasHeader) {
+            conf.put(AsterixTupleParserFactory.HAS_HEADER,
+                    hasHeader ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
+        }
+        conf.put(AsterixTupleParserFactory.KEY_QUOTE, "" + quote);
+        return new AsterixTupleParserFactory(conf, recType, inputFormat);
     }
 
+
     @Override
     public INullWriterFactory getNullWriterFactory() {
         return AqlNullWriterFactory.INSTANCE;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
index d9103ef..d42e0d9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -20,18 +20,16 @@ import java.io.InputStream;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.VSizeFrame;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 
 /**
- * An Abstract class implementation for ITupleParser. It provides common
+ * An abstract class implementation for ITupleParser. It provides common
  * functionality involved in parsing data in an external format and packing
  * frames with formed tuples.
  */
@@ -41,37 +39,34 @@ public abstract class AbstractTupleParser implements ITupleParser {
 
     protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
     protected DataOutput dos = tb.getDataOutput();
-    protected final FrameTupleAppender appender;
     protected final ARecordType recType;
     protected final IHyracksTaskContext ctx;
-    protected String filename;
 
     public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
-        appender = new FrameTupleAppender(new VSizeFrame(ctx));
         this.recType = recType;
         this.ctx = ctx;
     }
 
-    public void setFilename(String filename) {
-        this.filename = filename;
-    }
-
     public abstract IDataParser getDataParser();
 
+    public abstract ITupleForwardPolicy getTupleParserPolicy();
+
     @Override
     public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
         IDataParser parser = getDataParser();
+        ITupleForwardPolicy policy = getTupleParserPolicy();
         try {
             parser.initialize(in, recType, true);
+            policy.initialize(ctx, writer);
             while (true) {
                 tb.reset();
                 if (!parser.parse(tb.getDataOutput())) {
                     break;
                 }
                 tb.addFieldEndOffset();
-                addTupleToFrame(writer);
+                policy.addTuple(tb);
             }
-            appender.flush(writer, true);
+            policy.close();
         } catch (AsterixException ae) {
             throw new HyracksDataException(ae);
         } catch (IOException ioe) {
@@ -79,15 +74,4 @@ public abstract class AbstractTupleParser implements ITupleParser {
         }
     }
 
-    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
-        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            appender.flush(writer, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException("Tuple size(" + tb.getSize() + ") is greater than frame size("
-                        + AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize() + ")");
-            }
-        }
-
-    }
-
 }