You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:04 UTC

[07/24] incubator-asterixdb git commit: Introduces Feeds 2.0

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 56fbcb5..ec727a9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -21,7 +21,6 @@ import java.rmi.RemoteException;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.metadata.MetadataException;
@@ -32,8 +31,6 @@ import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -497,16 +494,6 @@ public interface IMetadataNode extends Remote, Serializable {
     public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException,
             RemoteException;
 
-    public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId,
-            FeedActivityType... feedActivityFilter) throws MetadataException, RemoteException;
-
-    /**
-     * @param jobId
-     * @throws MetadataException
-     * @throws RemoteException
-     */
-    public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException;
-
     /**
      * @param jobId
      * @param dataverse
@@ -559,16 +546,7 @@ public interface IMetadataNode extends Remote, Serializable {
      */
     public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException;
 
-    /**
-     * @param jobId
-     *            A globally unique id for an active metadata transaction.
-     * @param feedId
-     *            A unique id for the feed
-     * @param feedActivity
-     */
-    public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity)
-            throws MetadataException, RemoteException;
-
+   
     /**
      * @param jobId
      * @param feedPolicy
@@ -588,17 +566,7 @@ public interface IMetadataNode extends Remote, Serializable {
     public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
             RemoteException;
 
-    /**
-     * @param jobId
-     * @param dataverse
-     * @param dataset
-     * @return
-     * @throws MetadataException
-     * @throws RemoteException
-     */
-    public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
-            RemoteException;
-
+   
     /**
      * Removes a library , acquiring local locks on behalf of the given
      * transaction id.
@@ -667,16 +635,28 @@ public interface IMetadataNode extends Remote, Serializable {
     public List<Feed> getDataverseFeeds(JobId jobId, String dataverseName) throws MetadataException, RemoteException;
 
     /**
+     * delete a give feed (ingestion) policy
+     * 
      * @param jobId
      * @param dataverseName
-     * @param deedName
+     * @param policyName
+     * @return
+     * @throws RemoteException
+     * @throws MetadataException
+     */
+    public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
+            RemoteException;
+
+    /**
+     * @param jobId
+     * @param dataverse
      * @return
      * @throws MetadataException
      * @throws RemoteException
      */
-    public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverseName, String deedName)
-            throws MetadataException, RemoteException;
-    
+    public List<FeedPolicy> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
+            RemoteException;
+
     /**
      * @param jobId
      *            A globally unique id for an active metadata transaction.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index c93c29e..0ac211e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -57,9 +57,9 @@ import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.FileStructur
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
@@ -123,7 +123,7 @@ public class MetadataBootstrap {
                 MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
                 MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
                 MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_DATASET,
-                MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET,
+                MetadataPrimaryIndexes.FEED_POLICY_DATASET,
                 MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
                 MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET };
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 7eae5cd..04e56b1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.metadata.bootstrap;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import edu.uci.ics.asterix.metadata.MetadataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 30ae9fa..b726ed6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -50,19 +50,21 @@ public final class MetadataRecordTypes {
     public static ARecordType FUNCTION_RECORDTYPE;
     public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
     public static ARecordType FEED_RECORDTYPE;
+    public static ARecordType PRIMARY_FEED_DETAILS_RECORDTYPE;
+    public static ARecordType SECONDARY_FEED_DETAILS_RECORDTYPE;
     public static ARecordType FEED_ADAPTER_CONFIGURATION_RECORDTYPE;
     public static ARecordType FEED_ACTIVITY_RECORDTYPE;
     public static ARecordType FEED_POLICY_RECORDTYPE;
     public static ARecordType POLICY_PARAMS_RECORDTYPE;
-    public static ARecordType FEED_ACTIVITY_DETAILS_RECORDTYPE;
     public static ARecordType LIBRARY_RECORDTYPE;
     public static ARecordType COMPACTION_POLICY_RECORDTYPE;
     public static ARecordType EXTERNAL_FILE_RECORDTYPE;
 
     /**
      * Create all metadata record types.
+     * @throws HyracksDataException 
      */
-    public static void init() throws MetadataException {
+    public static void init() throws MetadataException, HyracksDataException {
         // Attention: The order of these calls is important because some types
         // depend on other types being created first.
         // These calls are one "dependency chain".
@@ -90,10 +92,10 @@ public final class MetadataRecordTypes {
             FUNCTION_RECORDTYPE = createFunctionRecordType();
             DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
 
-            FEED_RECORDTYPE = createFeedRecordType();
             FEED_ADAPTER_CONFIGURATION_RECORDTYPE = createPropertiesRecordType();
-            FEED_ACTIVITY_DETAILS_RECORDTYPE = createPropertiesRecordType();
-            FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
+            PRIMARY_FEED_DETAILS_RECORDTYPE = createPrimaryFeedDetailsRecordType();
+            SECONDARY_FEED_DETAILS_RECORDTYPE = createSecondaryFeedDetailsRecordType();
+            FEED_RECORDTYPE = createFeedRecordType();
             FEED_POLICY_RECORDTYPE = createFeedPolicyRecordType();
             LIBRARY_RECORDTYPE = createLibraryRecordType();
 
@@ -498,50 +500,67 @@ public final class MetadataRecordTypes {
     public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 4;
     public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5;
     public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
-
-    private static ARecordType createFeedActivityRecordType() throws AsterixException {
-        AUnorderedListType unorderedPropertyListType = new AUnorderedListType(FEED_ACTIVITY_DETAILS_RECORDTYPE, null);
-        String[] fieldNames = { "DataverseName", "FeedName", "DatasetName", "ActivityId", "ActivityType", "Details",
-                "Timestamp" };
-        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
-                BuiltinType.ASTRING, unorderedPropertyListType, BuiltinType.ASTRING };
-        try {
-            return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-    }
+   
 
     public static final int FEED_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
     public static final int FEED_ARECORD_FEED_NAME_FIELD_INDEX = 1;
-    public static final int FEED_ARECORD_ADAPTER_NAME_FIELD_INDEX = 2;
-    public static final int FEED_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 3;
-    public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 4;
-    public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 5;
+    public static final int FEED_ARECORD_FUNCTION_FIELD_INDEX = 2;
+    public static final int FEED_ARECORD_FEED_TYPE_FIELD_INDEX = 3;
+    public static final int FEED_ARECORD_PRIMARY_TYPE_DETAILS_FIELD_INDEX = 4;
+    public static final int FEED_ARECORD_SECONDARY_TYPE_DETAILS_FIELD_INDEX = 5;
+    public static final int FEED_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
 
-    private static ARecordType createFeedRecordType() throws AsterixException {
+    
+    public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_NAME_FIELD_INDEX = 0;
+    public static final int FEED_ARECORD_PRIMARY_FIELD_DETAILS_ADAPTOR_CONFIGURATION_FIELD_INDEX = 1;
 
-        AUnorderedListType unorderedAdapterPropertyListType = new AUnorderedListType(
-                DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
+    public static final int FEED_ARECORD_SECONDARY_FIELD_DETAILS_SOURCE_FEED_NAME_FIELD_INDEX = 0;
+
+    private static ARecordType createFeedRecordType() throws AsterixException, HyracksDataException {
 
         List<IAType> feedFunctionUnionList = new ArrayList<IAType>();
         feedFunctionUnionList.add(BuiltinType.ANULL);
         feedFunctionUnionList.add(BuiltinType.ASTRING);
         AUnionType feedFunctionUnion = new AUnionType(feedFunctionUnionList, null);
 
-        String[] fieldNames = { "DataverseName", "FeedName", "AdapterName", "AdapterConfiguration", "Function",
-                "Timestamp" };
-        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                unorderedAdapterPropertyListType, feedFunctionUnion, BuiltinType.ASTRING };
+        List<IAType> primaryFeedTypeDetailsRecordUnionList = new ArrayList<IAType>();
+        primaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL);
+        primaryFeedTypeDetailsRecordUnionList.add(PRIMARY_FEED_DETAILS_RECORDTYPE);
+        AUnionType primaryRecordUnion = new AUnionType(primaryFeedTypeDetailsRecordUnionList, null);
 
-        try {
-            return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
+        List<IAType> secondaryFeedTypeDetailsRecordUnionList = new ArrayList<IAType>();
+        secondaryFeedTypeDetailsRecordUnionList.add(BuiltinType.ANULL);
+        secondaryFeedTypeDetailsRecordUnionList.add(SECONDARY_FEED_DETAILS_RECORDTYPE);
+        AUnionType secondaryRecordUnion = new AUnionType(secondaryFeedTypeDetailsRecordUnionList, null);
+
+        String[] fieldNames = { "DataverseName", "FeedName", "Function", "FeedType", "PrimaryTypeDetails",
+                "SecondaryTypeDetails", "Timestamp" };
+        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, feedFunctionUnion, BuiltinType.ASTRING,
+                primaryRecordUnion, secondaryRecordUnion, BuiltinType.ASTRING };
 
+        return new ARecordType("FeedRecordType", fieldNames, fieldTypes, true);
     }
 
+    public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_NAME_FIELD_INDEX = 0;
+    public static final int FEED_TYPE_PRIMARY_ARECORD_ADAPTER_CONFIGURATION_FIELD_INDEX = 1;
+    
+    private static final ARecordType createPrimaryFeedDetailsRecordType() throws AsterixException, HyracksDataException {
+        AUnorderedListType unorderedAdaptorPropertyListType = new AUnorderedListType(
+                DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE, null);
+
+        String[] fieldNames = { "AdapterName", "AdapterConfiguration" };
+        IAType[] fieldTypes = { BuiltinType.ASTRING, unorderedAdaptorPropertyListType };
+        return new ARecordType(null, fieldNames, fieldTypes, true);
+    }
+
+    public static final int FEED_TYPE_SECONDARY_ARECORD_SOURCE_FEED_NAME_FIELD_INDEX = 0;
+
+    private static final ARecordType createSecondaryFeedDetailsRecordType() throws AsterixException, HyracksDataException {
+        String[] fieldNames = { "SourceFeedName" };
+        IAType[] fieldTypes = { BuiltinType.ASTRING };
+        return new ARecordType(null, fieldNames, fieldTypes, true);
+    }
+    
     public static final int LIBRARY_ARECORD_DATAVERSENAME_FIELD_INDEX = 0;
     public static final int LIBRARY_ARECORD_NAME_FIELD_INDEX = 1;
     public static final int LIBRARY_ARECORD_TIMESTAMP_FIELD_INDEX = 2;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
index 6948dbc..037b04d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -16,8 +16,8 @@ package edu.uci.ics.asterix.metadata.cluster;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
 
 public abstract class AbstractClusterManagementWork implements IClusterManagementWork {
 
@@ -35,6 +35,8 @@ public abstract class AbstractClusterManagementWork implements IClusterManagemen
         this.workId = WorkIdGenerator.getNextWorkId();
     }
 
+   
+
     private static class WorkIdGenerator {
         private static AtomicInteger workId = new AtomicInteger(0);
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
index 68dcc4c..1157562 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -1,27 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.asterix.metadata.cluster;
 
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import java.util.Set;
+
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
 
 public class AddNodeWork extends AbstractClusterManagementWork {
 
-    private final int numberOfNodes;
+    private final int numberOfNodesRequested;
+    private final Set<String> deadNodes;
 
     @Override
     public WorkType getClusterManagementWorkType() {
         return WorkType.ADD_NODE;
     }
 
-    public AddNodeWork(int numberOfNodes, IClusterEventsSubscriber subscriber) {
+    public AddNodeWork(Set<String> deadNodes, int numberOfNodesRequested, IClusterEventsSubscriber subscriber) {
         super(subscriber);
-        this.numberOfNodes = numberOfNodes;
+        this.deadNodes = deadNodes;
+        this.numberOfNodesRequested = numberOfNodesRequested;
+    }
+
+    public int getNumberOfNodesRequested() {
+        return numberOfNodesRequested;
     }
 
-    public int getNumberOfNodes() {
-        return numberOfNodes;
+    public Set<String> getDeadNodes() {
+        return deadNodes;
     }
 
     @Override
     public String toString() {
-        return WorkType.ADD_NODE + " " + numberOfNodes + " requested by " + subscriber;
+        return WorkType.ADD_NODE + " " + numberOfNodesRequested + " requested by " + subscriber;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
index d578a77..7f3b575 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -1,6 +1,7 @@
 package edu.uci.ics.asterix.metadata.cluster;
 
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork;
+import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse;
 
 public class ClusterManagementWorkResponse implements IClusterManagementWorkResponse {
 
@@ -13,6 +14,7 @@ public class ClusterManagementWorkResponse implements IClusterManagementWorkResp
         this.status = Status.IN_PROGRESS;
     }
 
+   
     @Override
     public IClusterManagementWork getWork() {
         return work;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index fe7f4a4..47ca953 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.management.AsterixEventServiceClient;
@@ -39,7 +40,6 @@ import edu.uci.ics.asterix.event.service.ILookupService;
 import edu.uci.ics.asterix.event.service.ServiceProvider;
 import edu.uci.ics.asterix.event.util.PatternCreator;
 import edu.uci.ics.asterix.installer.schema.conf.Configuration;
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.metadata.api.IClusterManager;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
deleted file mode 100644
index dfc88ac..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package edu.uci.ics.asterix.metadata.cluster;
-
-import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-
-public interface IClusterManagementWorkResponse {
-
-    public enum Status {
-        IN_PROGRESS,
-        SUCCESS,
-        FAILURE
-    }
-
-    /**
-     * @return
-     */
-    public IClusterManagementWork getWork();
-
-    /**
-     * @return
-     */
-    public Status getStatus();
-
-    /**
-     * @param status
-     */
-    public void setStatus(Status status);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
index 90683d1..8b4f2aa 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -2,7 +2,7 @@ package edu.uci.ics.asterix.metadata.cluster;
 
 import java.util.Set;
 
-import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber;
 
 public class RemoveNodeWork extends AbstractClusterManagementWork {
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index ef23135..5df2c95 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -43,8 +43,7 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartition
 public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
 
     private final AqlSourceId id;
-    private final String datasourceDataverse;
-    private final String datasourceName;
+   private final IAType itemType;
     private final AqlDataSourceType datasourceType;
     protected IAType[] schemaTypes;
     protected INodeDomain domain;
@@ -58,19 +57,18 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
     }
 
     public AqlDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName,
-            AqlDataSourceType datasourceType) throws AlgebricksException {
+            IAType itemType, AqlDataSourceType datasourceType) throws AlgebricksException {
         this.id = id;
-        this.datasourceDataverse = datasourceDataverse;
-        this.datasourceName = datasourceName;
+        this.itemType = itemType;
         this.datasourceType = datasourceType;
     }
 
     public String getDatasourceDataverse() {
-        return datasourceDataverse;
+        return id.getDataverseName();
     }
 
     public String getDatasourceName() {
-        return datasourceName;
+        return id.getDatasourceName();
     }
 
     @Override
@@ -196,7 +194,10 @@ public abstract class AqlDataSource implements IDataSource<AqlSourceId> {
     public Map<String, Serializable> getProperties() {
         return properties;
     }
-
+    
+    public IAType getItemType() {
+        return itemType;
+    }
     public void setProperties(Map<String, Serializable> properties) {
         this.properties = properties;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 5055cea..0ec098d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -18,6 +18,7 @@ package edu.uci.ics.asterix.metadata.declared;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -37,7 +38,12 @@ import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOp
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.ICentralFeedManager;
 import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
@@ -65,23 +71,19 @@ import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.external.IndexingConstants;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
-import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
 import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -187,6 +189,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     private boolean asyncResults;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
+    private final ICentralFeedManager centralFeedManager;
 
     private final Dataverse defaultDataverse;
     private JobId jobId;
@@ -213,10 +216,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return config;
     }
 
-    public AqlMetadataProvider(Dataverse defaultDataverse) {
+    public AqlMetadataProvider(Dataverse defaultDataverse, ICentralFeedManager centralFeedManager) {
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        this.centralFeedManager = centralFeedManager;
     }
 
     public void setJobId(JobId jobId) {
@@ -330,10 +334,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
         try {
             switch (((AqlDataSource) dataSource).getDatasourceType()) {
-                case FEED: {
-                    // loading data from a feed
-                    return buildFeedIntakeRuntime(jobSpec, dataSource);
-                }
+                case FEED:
+                    return buildFeedCollectRuntime(jobSpec, dataSource);
                 case INTERNAL_DATASET: {
                     // querying an internal dataset
                     return buildInternalDatasetScan(jobSpec, scanVariables, minFilterVars, maxFilterVars, opSchema,
@@ -376,6 +378,110 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         }
     }
 
+@SuppressWarnings("rawtypes")
+public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedCollectRuntime(JobSpecification jobSpec,
+        IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+
+    FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+    FeedCollectOperatorDescriptor feedCollector = null;
+
+    try {
+        ARecordType feedOutputType = (ARecordType) feedDataSource.getItemType();
+        ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+                .getSerializerDeserializer(feedOutputType);
+        RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+        FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+                BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+        if (feedPolicy == null) {
+            throw new AlgebricksException("Feed not configured with a policy");
+        }
+        feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+        FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
+                feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
+        feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
+                feedDataSource.getSourceFeedId(), (ARecordType) feedOutputType, feedDesc,
+                feedPolicy.getProperties(), feedDataSource.getLocation());
+
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedCollector,
+                determineLocationConstraint(feedDataSource));
+
+    } catch (Exception e) {
+        throw new AlgebricksException(e);
+    }
+}
+
+private AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
+        throws AsterixException {
+    String[] locationArray = null;
+    String locations = null;;
+    switch (feedDataSource.getSourceFeedType()) {
+        case PRIMARY:
+            switch (feedDataSource.getLocation()) {
+                case SOURCE_FEED_COMPUTE_STAGE:
+                    if (feedDataSource.getFeed().getFeedId().equals(feedDataSource.getSourceFeedId())) {
+                        locationArray = feedDataSource.getLocations();
+                    } else {
+                        Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager()
+                                .getFeedActivities();
+                        Iterator<FeedActivity> it = activities.iterator();
+                        FeedActivity activity = null;
+                        while (it.hasNext()) {
+                            activity = it.next();
+                            if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
+                                    && activity.getFeedName()
+                                            .equals(feedDataSource.getSourceFeedId().getFeedName())) {
+                                locations = activity.getFeedActivityDetails().get(
+                                        FeedActivityDetails.COMPUTE_LOCATIONS);
+                                locationArray = locations.split(",");
+                                break;
+                            }
+                        }
+                    }
+                    break;
+                case SOURCE_FEED_INTAKE_STAGE:
+                    locationArray = feedDataSource.getLocations();
+                    break;
+            }
+            break;
+        case SECONDARY:
+            Collection<FeedActivity> activities = centralFeedManager.getFeedLoadManager().getFeedActivities();
+            Iterator<FeedActivity> it = activities.iterator();
+            FeedActivity activity = null;
+            while (it.hasNext()) {
+                activity = it.next();
+                if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
+                        && activity.getFeedName().equals(feedDataSource.getSourceFeedId().getFeedName())) {
+                    switch (feedDataSource.getLocation()) {
+                        case SOURCE_FEED_INTAKE_STAGE:
+                            locations = activity.getFeedActivityDetails()
+                                    .get(FeedActivityDetails.COLLECT_LOCATIONS);
+                            break;
+                        case SOURCE_FEED_COMPUTE_STAGE:
+                            locations = activity.getFeedActivityDetails()
+                                    .get(FeedActivityDetails.COMPUTE_LOCATIONS);
+                            break;
+                    }
+                    break;
+                }
+            }
+
+            if (locations != null) {
+                locationArray = locations.split(",");
+            } else {
+                String message = "Unable to discover location(s) for source feed data hand-off "
+                        + feedDataSource.getSourceFeedId();
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(message);
+                }
+                throw new AsterixException(message);
+            }
+            break;
+    }
+    AlgebricksAbsolutePartitionConstraint locationConstraint = new AlgebricksAbsolutePartitionConstraint(
+            locationArray);
+    return locationConstraint;
+}
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
             LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
             List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
@@ -411,7 +517,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             JobGenContext context, Object implConfig) throws AlgebricksException, MetadataException {
         AqlSourceId asid = dataSource.getId();
         String dataverseName = asid.getDataverseName();
-        String datasetName = asid.getDatasetName();
+        String datasetName = asid.getDatasourceName();
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
 
         int[] minFilterFieldIndexes = null;
@@ -439,8 +545,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     private IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
-            Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys) throws AlgebricksException {
+            Map<String, String> configuration, IAType itemType, boolean isPKAutoGenerated, List<List<String>> primaryKeys)
+            throws AlgebricksException {
         IAdapterFactory adapterFactory;
         DatasourceAdapter adapterEntity;
         String adapterFactoryClassname;
@@ -458,6 +564,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
+            adapterFactory.configure(configuration, (ARecordType) itemType);
+
             // check to see if dataset is indexed
             Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(),
@@ -472,18 +580,11 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
                         iterator.remove();
                     }
                 }
-                ((IGenericAdapterFactory) adapterFactory).setFiles(files);
+                // TODO Check this call, result of merge from master! 
+                //  ((IGenericAdapterFactory) adapterFactory).setFiles(files);
             }
-
-            switch (adapterFactory.getAdapterType()) {
-                case GENERIC:
-                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType);
-                    break;
-                case TYPED:
-                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
-                    break;
-            }
-            return adapterFactory;
+            
+           return adapterFactory; 
         } catch (Exception e) {
             throw new AlgebricksException("Unable to create adapter " + e);
         }
@@ -539,78 +640,30 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
     }
 
-    @SuppressWarnings("rawtypes")
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
-            IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-
-        FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory> buildFeedIntakeRuntime(
+            JobSpecification jobSpec, PrimaryFeed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
+        Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+        factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx);
+        IFeedAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
-        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
-        AlgebricksPartitionConstraint constraint = null;
-
-        try {
-            factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
-            IAdapterFactory adapterFactory = factoryOutput.first;
-            ARecordType adapterOutputType = factoryOutput.second;
-            AdapterType adapterType = factoryOutput.third;
-
-            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
-                    .getSerializerDeserializer(adapterOutputType);
-            RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-            FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
-                    BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-            if (feedPolicy == null) {
-                throw new AlgebricksException("Feed not configured with a policy");
-            }
-            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-            switch (adapterType) {
-                case INTERNAL:
-                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
-                            feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                            .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
-                            feedDesc, feedPolicy.getProperties());
-                    break;
-                case EXTERNAL:
-                    String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0];
-                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
-                            libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
-                            .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
-                    break;
-            }
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
-            }
-            constraint = factoryOutput.first.getPartitionConstraint();
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
+        switch (factoryOutput.third) {
+            case INTERNAL:
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, adapterFactory,
+                        factoryOutput.second, policyAccessor);
+                break;
+            case EXTERNAL:
+                String libraryName = primaryFeed.getAdaptorName().trim()
+                        .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, adapterFactory
+                        .getClass().getName(), factoryOutput.second, policyAccessor);
+                break;
         }
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
-            JobSpecification jobSpec, String dataverse, String feedName, String dataset, IFeedMessage feedMessage,
-            String[] locations) throws AlgebricksException {
-        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations);
-        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, feedName,
-                dataset, feedMessage);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
-    }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
-            JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity)
-                    throws AlgebricksException {
-        List<String> feedLocations = new ArrayList<String>();
-        String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS)
-                .split(",");
-        for (String loc : ingestLocs) {
-            feedLocations.add(loc);
-        }
-        FeedConnectionId feedId = new FeedConnectionId(dataverse, feedName, dataset);
-        String[] locations = feedLocations.toArray(new String[] {});
-        IFeedMessage feedMessage = new EndFeedMessage(feedId);
-        return buildSendFeedMessageRuntime(jobSpec, dataverse, feedName, dataset, feedMessage, locations);
+        AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
+        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IFeedAdapterFactory>(feedIngestor,
+                partitionConstraint, adapterFactory);
     }
+   
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
@@ -934,7 +987,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
     }
 
     public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
-        Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasetName());
+        Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
         if (dataset == null) {
             throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
         }
@@ -942,7 +995,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
         IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, aqlId.getDataverseName(), tName).getDatatype();
         AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET
                 : AqlDataSourceType.INTERNAL_DATASET;
-        return new DatasetDataSource(aqlId, aqlId.getDataverseName(), aqlId.getDatasetName(), itemType, datasourceType);
+        return new DatasetDataSource(aqlId, aqlId.getDataverseName(), aqlId.getDatasourceName(), itemType, datasourceType);
     }
 
     @Override
@@ -971,7 +1024,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
             JobSpecification spec) throws AlgebricksException {
         String dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasetName();
+        String datasetName = dataSource.getId().getDatasourceName();
 
         Dataset dataset = findDataset(dataverseName, datasetName);
         if (dataset == null) {
@@ -1055,7 +1108,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
                     throws AlgebricksException {
 
-        String datasetName = dataSource.getId().getDatasetName();
+        String datasetName = dataSource.getId().getDatasourceName();
         Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
         if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse "
@@ -1173,7 +1226,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
             JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
         Dataset dataset = findDataset(dataverseName, datasetName);
         if (dataset == null) {
@@ -1234,7 +1287,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
 
         String indexName = dataSourceIndex.getId();
         String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
         IOperatorSchema inputSchema = new OperatorSchemaImpl();
         if (inputSchemas.length > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
index 4cee35d..be9f0e2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlSourceId.java
@@ -20,23 +20,23 @@ import java.io.File;
 public class AqlSourceId {
 
     private String dataverseName;
-    private String datasetName;
+    private String datasourceName;
 
-    public AqlSourceId(String dataverseName, String datasetName) {
+    public AqlSourceId(String dataverseName, String datasourceName) {
         this.dataverseName = dataverseName;
-        this.datasetName = datasetName;
+        this.datasourceName = datasourceName;
     }
 
     @Override
     public String toString() {
-        return dataverseName + File.pathSeparator + datasetName;
+        return dataverseName + File.pathSeparator + datasourceName;
     }
 
     public String getDataverseName() {
         return dataverseName;
     }
 
-    public String getDatasetName() {
-        return datasetName;
+    public String getDatasourceName() {
+        return datasourceName;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
index 059a10c..fd06e99 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
@@ -21,7 +21,7 @@ public class DatasetDataSource extends AqlDataSource {
 
     public DatasetDataSource(AqlSourceId id, String datasourceDataverse, String datasourceName, IAType itemType,
             AqlDataSourceType datasourceType) throws AlgebricksException {
-        super(id, datasourceDataverse, datasourceName, datasourceType);
+        super(id, datasourceDataverse, datasourceName, itemType, datasourceType);
         MetadataTransactionContext ctx = null;
         try {
             ctx = MetadataManager.INSTANCE.beginTransaction();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
index 695ae31..44402fc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedDataSource.java
@@ -14,32 +14,42 @@
  */
 package edu.uci.ics.asterix.metadata.declared;
 
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Feed.FeedType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
 public class FeedDataSource extends AqlDataSource {
 
     private Feed feed;
-    private final FeedConnectionId feedConnectionId;
+    private final FeedId sourceFeedId;
+    private final FeedType sourceFeedType;
+    private final ConnectionLocation location;
+    private final String targetDataset;
+    private final String[] locations;
+    private final int computeCardinality;
 
-    public FeedDataSource(AqlSourceId id, FeedConnectionId feedId, IAType itemType, AqlDataSourceType dataSourceType)
+    public FeedDataSource(AqlSourceId id, String targetDataset, IAType itemType, AqlDataSourceType dataSourceType,
+            FeedId sourceFeedId, FeedType sourceFeedType, ConnectionLocation location, String[] locations)
             throws AlgebricksException {
-        super(id, feedId.getDataverse(), feedId.getFeedName(), dataSourceType);
-        this.feedConnectionId = feedId;
-        feed = null;
+        super(id, id.getDataverseName(), id.getDatasourceName(), itemType, dataSourceType);
+        this.targetDataset = targetDataset;
+        this.sourceFeedId = sourceFeedId;
+        this.sourceFeedType = sourceFeedType;
+        this.location = location;
+        this.locations = locations;
+        this.computeCardinality = AsterixClusterProperties.INSTANCE.getParticipantNodes().size();
         MetadataTransactionContext ctx = null;
         try {
             MetadataManager.INSTANCE.acquireReadLatch();
             ctx = MetadataManager.INSTANCE.beginTransaction();
-            feed = MetadataManager.INSTANCE.getFeed(ctx, feedId.getDataverse(), feedId.getFeedName());
-            if (feed == null) {
-                throw new AlgebricksException("Unknown feed " + feedId);
-            }
+            this.feed = MetadataManager.INSTANCE.getFeed(ctx, id.getDataverseName(), id.getDatasourceName());
             MetadataManager.INSTANCE.commitTransaction(ctx);
             initFeedDataSource(itemType);
         } catch (Exception e) {
@@ -71,8 +81,20 @@ public class FeedDataSource extends AqlDataSource {
         return domain;
     }
 
-    public FeedConnectionId getFeedConnectionId() {
-        return feedConnectionId;
+    public String getTargetDataset() {
+        return targetDataset;
+    }
+
+    public FeedId getSourceFeedId() {
+        return sourceFeedId;
+    }
+
+    public ConnectionLocation getLocation() {
+        return location;
+    }
+
+    public String[] getLocations() {
+        return locations;
     }
 
     private void initFeedDataSource(IAType itemType) {
@@ -91,4 +113,12 @@ public class FeedDataSource extends AqlDataSource {
         };
         domain = domainForExternalData;
     }
+
+    public FeedType getSourceFeedType() {
+        return sourceFeedType;
+    }
+
+    public int getComputeCardinality() {
+        return computeCardinality;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
index 5701292..f4be491 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapter.java
@@ -4,8 +4,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
index cbf771b..fd4b6a7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FieldExtractingAdapterFactory.java
@@ -1,7 +1,10 @@
 package edu.uci.ics.asterix.metadata.declared;
 
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -40,11 +43,7 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory {
         return "FieldExtractingAdapter[ " + wrappedAdapterFactory.getName() + " ]";
     }
 
-    @Override
-    public AdapterType getAdapterType() {
-        return wrappedAdapterFactory.getAdapterType();
-    }
-
+  
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         return wrappedAdapterFactory.getPartitionConstraint();
@@ -55,5 +54,15 @@ public class FieldExtractingAdapterFactory implements IAdapterFactory {
         IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
         return new FieldExtractingAdapter(ctx, inRecDesc, outRecDesc, extractFields, rType, wrappedAdapter);
     }
+    
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        wrappedAdapterFactory.configure(configuration, outputType);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return wrappedAdapterFactory.getAdapterOutputType();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
index 2dc840e..c91e3f6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/LoadableDataSource.java
@@ -51,7 +51,7 @@ public class LoadableDataSource extends AqlDataSource {
 
     public LoadableDataSource(Dataset targetDataset, IAType itemType, String adapter, Map<String, String> properties)
             throws AlgebricksException, IOException {
-        super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source",
+        super(new AqlSourceId("loadable_dv", "loadable_ds"), "loadable_dv", "loadable_source", itemType,
                 AqlDataSourceType.LOADABLE);
         this.targetDataset = targetDataset;
         this.adapter = adapter;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
index 0938ccc..862092a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapter.java
@@ -18,7 +18,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.base.AMutableUUID;
 import edu.uci.ics.asterix.om.base.AUUID;
 import edu.uci.ics.asterix.om.pointables.ARecordPointable;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
index e371b2b..0a36ea8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/PKGeneratingAdapterFactory.java
@@ -14,8 +14,10 @@
  */
 package edu.uci.ics.asterix.metadata.declared;
 
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -58,11 +60,6 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory {
     }
 
     @Override
-    public AdapterType getAdapterType() {
-        return wrappedAdapterFactory.getAdapterType();
-    }
-
-    @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         return wrappedAdapterFactory.getPartitionConstraint();
     }
@@ -72,4 +69,14 @@ public class PKGeneratingAdapterFactory implements IAdapterFactory {
         IDatasourceAdapter wrappedAdapter = wrappedAdapterFactory.createAdapter(ctx, partition);
         return new PKGeneratingAdapter(ctx, inRecDesc, outRecDesc, inRecType, outRecType, wrappedAdapter, pkIndex);
     }
+
+    @Override
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+        wrappedAdapterFactory.configure(configuration, outputType);        
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return wrappedAdapterFactory.getAdapterOutputType();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
index 39aa8ab..53b75a6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Feed.java
@@ -15,62 +15,67 @@
 
 package edu.uci.ics.asterix.metadata.entities;
 
-import java.util.Map;
-
+import edu.uci.ics.asterix.common.feeds.FeedId;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.MetadataCache;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 
 /**
- * Metadata describing a feed.
+ * Feed POJO
  */
 public class Feed implements IMetadataEntity {
 
     private static final long serialVersionUID = 1L;
 
-    private final String dataverseName;
-    private final String feedName;
-    private final String adapterName;
-    private final Map<String, String> adapterConfiguration;
-    private final FunctionSignature appliedFunction;
-
-    public Feed(String dataverseName, String datasetName, String adapterName, Map<String, String> adapterConfiguration,
-            FunctionSignature appliedFunction) {
-        this.dataverseName = dataverseName;
-        this.feedName = datasetName;
-        this.adapterName = adapterName;
-        this.adapterConfiguration = adapterConfiguration;
-        this.appliedFunction = appliedFunction;
+    /** A unique identifier for the feed */
+    protected final FeedId feedId;
+
+    /** The function that is to be applied on each incoming feed tuple **/
+    protected final FunctionSignature appliedFunction;
+
+    /** The type {@code FeedType} associated with the feed. **/
+    protected final FeedType feedType;
+
+    /** A string representation of the instance **/
+    protected final String displayName;
+
+    public enum FeedType {
+        /**
+         * A feed that derives its data from an external source.
+         */
+        PRIMARY,
+
+        /**
+         * A feed that derives its data from another primary or secondary feed.
+         */
+        SECONDARY
     }
 
-    public String getDataverseName() {
-        return dataverseName;
+    public Feed(String dataverseName, String datasetName, FunctionSignature appliedFunction, FeedType feedType) {
+        this.feedId = new FeedId(dataverseName, datasetName);
+        this.appliedFunction = appliedFunction;
+        this.feedType = feedType;
+        this.displayName = feedType + "(" + feedId + ")";
     }
 
-    public String getFeedName() {
-        return feedName;
+    public FeedId getFeedId() {
+        return feedId;
     }
 
-    public String getAdapterName() {
-        return adapterName;
+    public String getDataverseName() {
+        return feedId.getDataverse();
     }
 
-    public Map<String, String> getAdapterConfiguration() {
-        return adapterConfiguration;
+    public String getFeedName() {
+        return feedId.getFeedName();
     }
 
     public FunctionSignature getAppliedFunction() {
         return appliedFunction;
     }
 
-    @Override
-    public Object addToCache(MetadataCache cache) {
-        return cache.addFeedIfNotExists(this);
-    }
-
-    @Override
-    public Object dropFromCache(MetadataCache cache) {
-        return cache.dropFeed(this);
+    public FeedType getFeedType() {
+        return feedType;
     }
 
     @Override
@@ -81,13 +86,27 @@ public class Feed implements IMetadataEntity {
         if (!(other instanceof Feed)) {
             return false;
         }
-        Feed otherDataset = (Feed) other;
-        if (!otherDataset.dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!otherDataset.feedName.equals(feedName)) {
-            return false;
-        }
-        return true;
+        Feed otherFeed = (Feed) other;
+        return otherFeed.getFeedId().equals(feedId);
+    }
+
+    @Override
+    public int hashCode() {
+        return displayName.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return feedType + "(" + feedId + ")";
+    }
+
+    @Override
+    public Object addToCache(MetadataCache cache) {
+        return cache.addFeedIfNotExists(this);
+    }
+
+    @Override
+    public Object dropFromCache(MetadataCache cache) {
+        return cache.dropFeed(this);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
deleted file mode 100644
index 679276f..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.metadata.entities;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.metadata.MetadataCache;
-import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-
-/**
- * Metadata describing a feed activity record.
- */
-public class FeedActivity implements IMetadataEntity, Comparable<FeedActivity> {
-
-    private static final long serialVersionUID = 1L;
-
-    private int activityId;
-
-    private final String dataverseName;
-    private final String datasetName;
-    private final String feedName;
-
-    private String lastUpdatedTimestamp;
-    private FeedActivityType activityType;
-    private Map<String, String> feedActivityDetails;
-
-    public static enum FeedActivityType {
-        FEED_BEGIN,
-        FEED_FAILURE,
-        FEED_END
-    }
-
-    public static class FeedActivityDetails {
-        public static final String COMPUTE_LOCATIONS = "compute-locations";
-        public static final String INGEST_LOCATIONS = "ingest-locations";
-        public static final String STORAGE_LOCATIONS = "storage-locations";
-        public static final String TOTAL_INGESTED = "total-ingested";
-        public static final String INGESTION_RATE = "ingestion-rate";
-        public static final String EXCEPTION_LOCATION = "exception-location";
-        public static final String EXCEPTION_MESSAGE = "exception-message";
-        public static final String FEED_POLICY_NAME = "feed-policy-name";
-        public static final String SUPER_FEED_MANAGER_HOST = "super-feed-manager-host";
-        public static final String SUPER_FEED_MANAGER_PORT = "super-feed-manager-port";
-        public static final String FEED_NODE_FAILURE = "feed-node-failure";
-
-    }
-
-    public FeedActivity(String dataverseName, String feedName, String datasetName, FeedActivityType feedActivityType,
-            Map<String, String> feedActivityDetails) {
-        this.dataverseName = dataverseName;
-        this.feedName = feedName;
-        this.datasetName = datasetName;
-        this.activityType = feedActivityType;
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public Object addToCache(MetadataCache cache) {
-        return cache.addFeedActivityIfNotExists(this);
-    }
-
-    @Override
-    public Object dropFromCache(MetadataCache cache) {
-        return cache.dropFeedActivity(this);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FeedActivity)) {
-            return false;
-        }
-
-        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedActivityType().equals(activityType)) {
-            return false;
-        }
-        if (((FeedActivity) other).getActivityId() != (activityId)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityType + " " + activityId;
-    }
-
-    public FeedActivityType getFeedActivityType() {
-        return activityType;
-    }
-
-    public void setFeedActivityType(FeedActivityType feedActivityType) {
-        this.activityType = feedActivityType;
-    }
-
-    public String getLastUpdatedTimestamp() {
-        return lastUpdatedTimestamp;
-    }
-
-    public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) {
-        this.lastUpdatedTimestamp = lastUpdatedTimestamp;
-    }
-
-    public int getActivityId() {
-        return activityId;
-    }
-
-    public void setActivityId(int activityId) {
-        this.activityId = activityId;
-    }
-
-    public Map<String, String> getFeedActivityDetails() {
-        return feedActivityDetails;
-    }
-
-    public void setFeedActivityDetails(Map<String, String> feedActivityDetails) {
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public FeedActivityType getActivityType() {
-        return activityType;
-    }
-
-    public void setActivityType(FeedActivityType activityType) {
-        this.activityType = activityType;
-    }
-
-    @Override
-    public int compareTo(FeedActivity o) {
-        return o.getActivityId() - this.activityId;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
new file mode 100644
index 0000000..62f0d07
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/PrimaryFeed.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+/**
+ * A primary feed is one that derives its data from an external source via an adaptor.
+ * This class is a holder object for the metadata associated with a primary feed.
+ */
+public class PrimaryFeed extends Feed implements IMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String adaptorName;
+    private final Map<String, String> adaptorConfiguration;
+
+    public PrimaryFeed(String dataverseName, String datasetName, String adaptorName,
+            Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction) {
+        super(dataverseName, datasetName, appliedFunction, FeedType.PRIMARY);
+        this.adaptorName = adaptorName;
+        this.adaptorConfiguration = adaptorConfiguration;
+    }
+
+    public String getAdaptorName() {
+        return adaptorName;
+    }
+
+    public Map<String, String> getAdaptorConfiguration() {
+        return adaptorConfiguration;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!super.equals(other) || !(other instanceof PrimaryFeed)) {
+            return false;
+        }
+
+        PrimaryFeed otherFeed = (PrimaryFeed) other;
+        if (!otherFeed.getAdaptorName().equals(adaptorName)) {
+            return false;
+        }
+
+        for (Entry<String, String> entry : adaptorConfiguration.entrySet()) {
+            if (!(entry.getValue().equals(otherFeed.getAdaptorConfiguration().get(entry.getKey())))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "PrimaryFeed (" + adaptorName + ")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
new file mode 100644
index 0000000..c1f51ba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/SecondaryFeed.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+
+/**
+ * A secondary feed is one that derives its data from another (primary/secondary) feed.
+ * This class is a holder object for the metadata associated with a secondary feed.
+ */
+public class SecondaryFeed extends Feed implements IMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String sourceFeedName;
+
+    public SecondaryFeed(String dataverseName, String feedName, String sourceFeedName, FunctionSignature appliedFunction) {
+        super(dataverseName, feedName, appliedFunction, FeedType.SECONDARY);
+        this.sourceFeedName = sourceFeedName;
+    }
+
+    public String getSourceFeedName() {
+        return sourceFeedName;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!super.equals(other) || !(other instanceof SecondaryFeed)) {
+            return false;
+        }
+
+        SecondaryFeed otherFeed = (SecondaryFeed) other;
+        if (!otherFeed.getSourceFeedName().equals(sourceFeedName)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "SecondaryFeed (" + feedId + ")" + "<--" + "(" + sourceFeedName + ")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 1bb34d2..580881d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -85,7 +85,7 @@ public class DatasourceAdapterTupleTranslator extends AbstractTupleTranslator<Da
         aString.setValue(adapter.getAdapterIdentifier().getNamespace());
         stringSerde.serialize(aString, tupleBuilder.getDataOutput());
         tupleBuilder.addFieldEndOffset();
-        aString.setValue(adapter.getAdapterIdentifier().getAdapterName());
+        aString.setValue(adapter.getAdapterIdentifier().getName());
         stringSerde.serialize(aString, tupleBuilder.getDataOutput());
         tupleBuilder.addFieldEndOffset();
 
@@ -101,7 +101,7 @@ public class DatasourceAdapterTupleTranslator extends AbstractTupleTranslator<Da
 
         // write field 1
         fieldValue.reset();
-        aString.setValue(adapter.getAdapterIdentifier().getAdapterName());
+        aString.setValue(adapter.getAdapterIdentifier().getName());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_ARECORD_NAME_FIELD_INDEX, fieldValue);