You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/03/16 00:36:28 UTC

[12/19] incubator-asterixdb git commit: Support Change Feeds and Ingestion of Records with MetaData

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
new file mode 100644
index 0000000..8ba1df3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Create a change feed with meta-data and test ingestion of records
+ * Expected Res : Success
+ * Date         : 24th Feb 2016
+ */
+
+drop dataverse Couchbase if exists;
+create dataverse Couchbase;
+use dataverse Couchbase;
+
+create type DocumentType as open{
+};
+
+create type CouchbaseMetaType as open{
+id:string,
+flags:int32,
+expiration:int32,
+cas:int64,
+rev:int64,
+vbid:int32,
+dtype:int32,
+};
+
+create dataset CouchFeedDataset(DocumentType) with meta(CouchbaseMetaType)primary key meta()."key";
+
+create feed CouchFeedWithMeta using FeedAdapter(
+    ("type-name"="DocumentType"),
+    ("meta-type-name"="CouchbaseMetaType"),
+    ("reader"="csv-with-record"),
+    ("path"="..."),
+    ("format"="record-with-meta")
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index d50a0a6..e6c9d53 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -29,25 +29,29 @@
         QueryOffsetPath="queries"
         QueryFileExtension=".aql">
     <test-group name="feeds">
-        <!-- Fails constantly and not clear what is intended
         <test-case FilePath="feeds">
-          <compilation-unit name="feeds_06">
-            <output-dir compare="Text">feeds_06</output-dir>
-          </compilation-unit>
-        </test-case> -->
+            <compilation-unit name="change-feed-with-meta-pk-in-meta">
+                <output-dir compare="Text">change-feed-with-meta-pk-in-meta</output-dir>
+            </compilation-unit>
+        </test-case>
         <test-case FilePath="feeds">
             <compilation-unit name="feed-with-filtered-dataset">
                 <output-dir compare="Text">feed-with-filtered-dataset</output-dir>
             </compilation-unit>
         </test-case>
         <test-case FilePath="feeds">
-            <compilation-unit name="feed-push-socket">
-                <output-dir compare="Text">feed-push-socket</output-dir>
+            <compilation-unit name="change-feed">
+                <output-dir compare="Text">change-feed</output-dir>
             </compilation-unit>
         </test-case>
         <test-case FilePath="feeds">
-            <compilation-unit name="drop-dataverse-with-disconnected-feed">
-                <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            <compilation-unit name="feed-with-meta-pk-in-meta">
+                <output-dir compare="Text">feed-with-meta-pk-in-meta</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feeds_07">
+                <output-dir compare="Text">feeds_07</output-dir>
             </compilation-unit>
         </test-case>
         <test-case FilePath="feeds">
@@ -55,9 +59,20 @@
                 <output-dir compare="Text">feed-with-external-parser</output-dir>
             </compilation-unit>
         </test-case>
+        <!-- Fails constantly and not clear what is intended
         <test-case FilePath="feeds">
-            <compilation-unit name="feeds_07">
-                <output-dir compare="Text">feeds_07</output-dir>
+          <compilation-unit name="feeds_06">
+            <output-dir compare="Text">feeds_06</output-dir>
+          </compilation-unit>
+        </test-case> -->
+        <test-case FilePath="feeds">
+            <compilation-unit name="drop-dataverse-with-disconnected-feed">
+                <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="feeds">
+            <compilation-unit name="feed-push-socket">
+                <output-dir compare="Text">feed-push-socket</output-dir>
             </compilation-unit>
         </test-case>
         <test-case FilePath="feeds">
@@ -115,12 +130,63 @@
                 <output-dir compare="Text">issue_230_feeds</output-dir>
             </compilation-unit>
         </test-case>
-<!-- 
-        <test-case FilePath="feeds">
+        <!--<test-case FilePath="feeds">
             <compilation-unit name="issue_711_feeds">
                 <output-dir compare="Text">issue_711_feeds</output-dir>
             </compilation-unit>
-        </test-case>  -->
+        </test-case>-->
+    </test-group>
+    <test-group name="upsert">
+        <test-case FilePath="upsert">
+            <compilation-unit name="filtered-dataset">
+                <output-dir compare="Text">filtered-dataset</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="nested-index">
+                <output-dir compare="Text">nested-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-rtree">
+                <output-dir compare="Text">primary-secondary-rtree</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="upsert-with-self-read">
+                <output-dir compare="Text">upsert-with-self-read</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="nullable-index">
+                <output-dir compare="Text">nullable-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="open-index">
+                <output-dir compare="Text">open-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-index">
+                <output-dir compare="Text">primary-index</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-btree">
+                <output-dir compare="Text">primary-secondary-btree</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="primary-secondary-inverted">
+                <output-dir compare="Text">primary-secondary-inverted</output-dir>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="upsert">
+            <compilation-unit name="multiple-secondaries">
+                <output-dir compare="Text">multiple-secondaries</output-dir>
+            </compilation-unit>
+        </test-case>
     </test-group>
     <test-group name="external-library">
         <test-case FilePath="external-library">
@@ -1320,60 +1386,26 @@
         </test-case>
         -->
     </test-group>
-    <test-group name="upsert">
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-rtree">
-                <output-dir compare="Text">primary-secondary-rtree</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="upsert-with-self-read">
-                <output-dir compare="Text">upsert-with-self-read</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="filtered-dataset">
-                <output-dir compare="Text">filtered-dataset</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="nullable-index">
-                <output-dir compare="Text">nullable-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="nested-index">
-                <output-dir compare="Text">nested-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="open-index">
-                <output-dir compare="Text">open-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-index">
-                <output-dir compare="Text">primary-index</output-dir>
-            </compilation-unit>
-        </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-btree">
-                <output-dir compare="Text">primary-secondary-btree</output-dir>
+    <test-group name="dml">
+        <test-case FilePath="dml">
+            <compilation-unit name="insert-dataset-with-meta">
+                <output-dir compare="Text">insert-dataset-with-meta</output-dir>
+                <expected-error>insert into dataset is not supported on Datasets with Meta records</expected-error>
             </compilation-unit>
         </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="primary-secondary-inverted">
-                <output-dir compare="Text">primary-secondary-inverted</output-dir>
+        <test-case FilePath="dml">
+            <compilation-unit name="delete-dataset-with-meta">
+                <output-dir compare="Text">delete-dataset-with-meta</output-dir>
+                <expected-error>delete from dataset is not supported on Datasets with Meta records</expected-error>
             </compilation-unit>
         </test-case>
-        <test-case FilePath="upsert">
-            <compilation-unit name="multiple-secondaries">
-                <output-dir compare="Text">multiple-secondaries</output-dir>
+        <test-case FilePath="dml">
+            <compilation-unit name="upsert-dataset-with-meta">
+                <output-dir compare="Text">upsert-dataset-with-meta</output-dir>
+                <expected-error>upsert into dataset is not supported on Datasets with Meta records</expected-error>
             </compilation-unit>
         </test-case>
-    </test-group>
-    <test-group name="dml">
-         <test-case FilePath="dml">
+        <test-case FilePath="dml">
             <compilation-unit name="load-with-ngram-index">
                 <output-dir compare="Text">load-with-ngram-index</output-dir>
             </compilation-unit>
@@ -6120,6 +6152,12 @@
     </test-group>
     <test-group name="load">
         <test-case FilePath="load">
+            <compilation-unit name="dataset-with-meta">
+                <output-dir compare="Text">dataset-with-meta</output-dir>
+                <expected-error>load dataset is not supported on Datasets with Meta records</expected-error>
+            </compilation-unit>
+        </test-case>
+        <test-case FilePath="load">
             <compilation-unit name="csv_01">
                 <output-dir compare="Text">csv_01</output-dir>
             </compilation-unit>
@@ -6178,13 +6216,13 @@
         <test-case FilePath="load">
             <compilation-unit name="issue14_query">
                 <output-dir compare="Text">issue14_query</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">
             <compilation-unit name="issue315_query">
                 <output-dir compare="Text">none</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2d994df..23215db 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5989,13 +5989,13 @@
         <test-case FilePath="load">
             <compilation-unit name="issue14_query">
                 <output-dir compare="Text">issue14_query</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">
             <compilation-unit name="issue315_query">
                 <output-dir compare="Text">none</output-dir>
-                <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+                <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
             </compilation-unit>
         </test-case>
         <test-case FilePath="load">

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 77a290d..8ceee62 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -45,7 +45,7 @@ import org.apache.asterix.common.configuration.TransactionLogDir;
 import org.apache.asterix.common.exceptions.AsterixException;
 
 public class AsterixPropertiesAccessor {
-    private static final Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
+    private static Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
 
     private final String instanceName;
     private final String metadataNodeName;
@@ -56,7 +56,7 @@ public class AsterixPropertiesAccessor {
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
     private final Map<String, ClusterPartition[]> nodePartitionsMap;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index ba32af2..947d7d7 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -65,6 +65,7 @@ public class FileTestServer implements ITestServer {
                         // This also could be due to the close() call
                     }
                 }
+
             }
         });
         listenerThread.start();
@@ -72,9 +73,11 @@ public class FileTestServer implements ITestServer {
 
     @Override
     public void stop() throws IOException, InterruptedException {
-        serverSocket.close();
-        if (listenerThread.isAlive()) {
-            listenerThread.join();
+        if (serverSocket.isBound()) {
+            serverSocket.close();
+            if (listenerThread.isAlive()) {
+                listenerThread.join();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 851acd4..a03ad1a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.adapter.factory;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
@@ -28,6 +29,8 @@ import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.provider.DataflowControllerProvider;
@@ -40,6 +43,7 @@ import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
 public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
@@ -53,6 +57,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     private boolean indexingOp;
     private boolean isFeed;
     private FileSplit[] feedLogFileSplits;
+    private ARecordType metaType;
 
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -66,7 +71,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
         return dataSourceFactory.getPartitionConstraint();
     }
 
@@ -74,14 +79,23 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
      * Runs on each node controller (after serialization-deserialization)
      */
     @Override
-    public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
-        restoreExternalObjects();
+    public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        try {
+            restoreExternalObjects();
+        } catch (AsterixException e) {
+            throw new HyracksDataException(e);
+        }
         IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
                 dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
-        return new GenericAdapter(controller);
+        if (isFeed) {
+            return new FeedAdapter((AbstractFeedDataFlowController) controller);
+        } else {
+            return new GenericAdapter(controller);
+        }
     }
 
-    private void restoreExternalObjects() throws Exception {
+    private void restoreExternalObjects() throws AsterixException {
         if (dataSourceFactory == null) {
             dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
             // create and configure parser factory
@@ -94,15 +108,19 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
             // create and configure parser factory
             dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
             dataParserFactory.setRecordType(recordType);
+            dataParserFactory.setMetaType(metaType);
             dataParserFactory.configure(configuration);
         }
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+            throws AsterixException {
         this.recordType = outputType;
+        this.metaType = metaType;
         this.configuration = configuration;
         dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
+
         dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
         prepare();
         ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
@@ -110,7 +128,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         nullifyExternalObjects();
     }
 
-    private void configureFeedLogManager() throws Exception {
+    private void configureFeedLogManager() throws AsterixException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
             feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
@@ -127,12 +145,13 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         }
     }
 
-    private void prepare() throws Exception {
+    private void prepare() throws AsterixException {
         if (dataSourceFactory.isIndexible() && (files != null)) {
             ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
         }
         dataSourceFactory.configure(configuration);
         dataParserFactory.setRecordType(recordType);
+        dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index f149ed3..49c5943 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.adapter.factory;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.ILookupReaderFactory;
 import org.apache.asterix.external.api.ILookupRecordReader;
 import org.apache.asterix.external.api.IRecordDataParser;
@@ -44,12 +45,12 @@ public class LookupAdapterFactory<T> implements Serializable {
     private static final long serialVersionUID = 1L;
     private IRecordDataParserFactory dataParserFactory;
     private ILookupReaderFactory readerFactory;
-    private ARecordType recordType;
-    private int[] ridFields;
+    private final ARecordType recordType;
+    private final int[] ridFields;
     private Map<String, String> configuration;
-    private boolean retainInput;
-    private boolean retainNull;
-    private INullWriterFactory iNullWriterFactory;
+    private final boolean retainInput;
+    private final boolean retainNull;
+    private final INullWriterFactory iNullWriterFactory;
 
     public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
             INullWriterFactory iNullWriterFactory) {
@@ -64,7 +65,6 @@ public class LookupAdapterFactory<T> implements Serializable {
             ExternalFileIndexAccessor snapshotAccessor, IFrameWriter writer) throws HyracksDataException {
         try {
             IRecordDataParser<T> dataParser = dataParserFactory.createRecordParser(ctx);
-            dataParser.configure(configuration, recordType);
             ILookupRecordReader<? extends T> reader = readerFactory.createRecordReader(ctx, partition,
                     snapshotAccessor);
             reader.configure(configuration);
@@ -76,7 +76,7 @@ public class LookupAdapterFactory<T> implements Serializable {
         }
     }
 
-    public void configure(Map<String, String> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws AsterixException {
         this.configuration = configuration;
         readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration);
         dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3965e5e..59a7514 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -21,9 +21,11 @@ package org.apache.asterix.external.api;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
@@ -50,7 +52,7 @@ public interface IAdapterFactory extends Serializable {
      * In the former case, the IP address is translated to a node controller id
      * running on the node with the given IP address.
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
 
     /**
      * Creates an instance of IDatasourceAdapter.
@@ -60,14 +62,21 @@ public interface IAdapterFactory extends Serializable {
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
+    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
 
     /**
      * @param configuration
      * @param outputType
+     * @param metaType
      * @throws Exception
      */
-    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
+    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+            throws AsterixException;
+
+    public default void configure(final Map<String, String> configuration, final ARecordType outputType)
+            throws AsterixException {
+        configure(configuration, outputType, null);
+    }
 
     /**
      * Gets the record type associated with the output of the adapter

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
index 252b43b..e5b22e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.api;
 
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.management.FeedId;
 
@@ -57,14 +58,14 @@ public interface IAdapterRuntimeManager {
     public void stop() throws Exception;
 
     /**
-     * @return feedId associated with the feed that is being ingested
+     * @return feedId associated with the feed that is being ingested.
      */
     public FeedId getFeedId();
 
     /**
-     * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
+     * @return an instance of the {@code FeedAdapter} in use.
      */
-    public IFeedAdapter getFeedAdapter();
+    public FeedAdapter getFeedAdapter();
 
     /**
      * @return state associated with the AdapterRuntimeManager. See {@code State}.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index d9ed131..33f262a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,46 +18,27 @@
  */
 package org.apache.asterix.external.api;
 
-import java.util.Map;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IDataFlowController {
 
-    /**
-     * Order of calls:
-     * 1. Constructor()
-     * 2. if record flow controller
-     * |-a. Set record reader
-     * |-b. Set record parser
-     * else
-     * |-a. Set stream parser
-     * 3. setTupleForwarder(forwarder)
-     * 4. configure(configuration,ctx)
-     * 5. start(writer)
-     *
-     * pause(), resume(), and stop() are only used with feeds
-     * pause is called after start when a feed is running and the system is overwhelmed with data.
-     * resume is called after the load goes down and we are ready to receive more data.
-     * stop is called to disconnect the feed. once stop is called, no other method is called.
-     *
-     */
-
+    //TODO: Refactor this interface. Remove writer from start() signature
     public void start(IFrameWriter writer) throws HyracksDataException;
 
-    public boolean stop() throws HyracksDataException;
-
-    public boolean pause() throws HyracksDataException;
-
-    public boolean resume() throws HyracksDataException;
-
-    public boolean handleException(Throwable th);
+    public default boolean pause() throws HyracksDataException {
+        throw new HyracksDataException("Method not implemented");
+    }
 
-    public ITupleForwarder getTupleForwarder();
+    public default boolean resume() throws HyracksDataException {
+        throw new HyracksDataException("Method not implemented");
+    }
 
-    public void setTupleForwarder(ITupleForwarder forwarder);
+    public default void flush() throws HyracksDataException {
+        throw new HyracksDataException("Method not implemented");
+    }
 
-    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws HyracksDataException;
+    public default boolean stop() throws HyracksDataException {
+        throw new HyracksDataException("Method not implemented");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index e680822..322e51f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -19,15 +19,11 @@
 package org.apache.asterix.external.api;
 
 import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableOrderedList;
 import org.apache.asterix.om.base.AMutableRecord;
@@ -43,24 +39,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public interface IDataParser {
 
-    /**
-     * @return The supported data sources
-     */
-    public DataSourceType getDataSourceType();
-
-    /**
-     * @param configuration
-     *            a set of configurations that comes from two sources.
-     *            1. The create adapter statement.
-     *            2. The query compiler.
-     * @param recordType
-     *            The expected record type
-     * @throws IOException
-     */
-    public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException;
-
     /*
-     * The following two static methods are expensive. right now, they are used by RSSFeeds and Twitter feed
+     * The following two static methods are expensive. right now, they are used by RSSFeeds and
+     * Twitter feed
      * TODO: Get rid of them
      */
     public static void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
index 5c3845c..1fc97c9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
@@ -35,18 +35,27 @@ public interface IDataParserFactory extends Serializable {
      *         an instance of IDataParserFactory with STREAM data source type must implement IStreamDataParserFactory
      * @throws AsterixException
      */
-    public DataSourceType getDataSourceType() throws AsterixException;
+    public DataSourceType getDataSourceType();
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the
      * submitted AQL statement and any additional pairs added by the compiler
+     *
      * @param configuration
      */
-    public void configure(Map<String, String> configuration) throws Exception;
+    public void configure(Map<String, String> configuration) throws AsterixException;
 
     /**
      * Set the record type expected to be produced by parsers created by this factory
+     *
      * @param recordType
      */
     public void setRecordType(ARecordType recordType);
+
+    /**
+     * Set the meta record type expected to be produced by parsers created by this factory
+     *
+     * @param metaType
+     */
+    public void setMetaType(ARecordType metaType);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 1487cf1..b49a719 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -31,7 +32,6 @@ public interface IExternalDataSourceFactory extends Serializable {
     /**
      * The data source type indicates whether the data source produces a continuous stream or
      * a set of records
-     * @author amoudi
      */
     public enum DataSourceType {
         STREAM,
@@ -45,21 +45,24 @@ public interface IExternalDataSourceFactory extends Serializable {
 
     /**
      * Specifies on which locations this data source is expected to run.
+     *
      * @return
-     * @throws Exception
+     * @throws AsterixException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the
      * submitted AQL statement and any additional pairs added by the compiler
+     *
      * @param configuration
-     * @throws Exception
+     * @throws AsterixException
      */
-    public void configure(Map<String, String> configuration) throws Exception;
+    public void configure(Map<String, String> configuration) throws AsterixException;
 
     /**
      * Specify whether the external data source can be indexed
+     *
      * @return
      */
     public default boolean isIndexible() {
@@ -93,5 +96,4 @@ public interface IExternalDataSourceFactory extends Serializable {
         }
         return constraints;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
index 0b4277e..01ffd99 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
@@ -18,36 +18,39 @@
  */
 package org.apache.asterix.external.api;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 /**
- * @author amoudi
- *         This Interface represents the component responsible for adding record ids to tuples when indexing external data
+ * This Interface represents the component responsible for adding record IDs to tuples when indexing external data
  */
 public interface IExternalIndexer extends Serializable {
 
     /**
      * This method is called by an indexible datasource when the external source reader have been updated.
      * this gives a chance for the indexer to update its reader specific values (i,e. file name)
+     *
      * @param reader
-     *        the new reader
+     *            the new reader
      * @throws Exception
      */
-    public void reset(IRecordReader<?> reader) throws Exception;
+    public void reset(IRecordReader<?> reader) throws IOException;
 
     /**
      * This method is called by the dataflow controller with each tuple. the indexer is expected to append record ids to the tuple.
+     *
      * @param tb
      * @throws Exception
      */
-    public void index(ArrayTupleBuilder tb) throws Exception;
+    public void index(ArrayTupleBuilder tb) throws IOException;
 
     /**
      * This method returns the number of fields in the record id. It is used by tuple appender at the initialization step.
+     *
      * @return
      * @throws Exception
      */
-    public int getNumberOfFields() throws Exception;
+    public int getNumberOfFields() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
deleted file mode 100644
index 3261556..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFeedAdapter extends IDataSourceAdapter {
-    /**
-     * Pause the ingestion of data.
-     * @throws HyracksDataException
-     * @throws Exception
-     */
-    public boolean pause() throws HyracksDataException;
-
-    /**
-     * Resume the ingestion of data.
-     * @throws HyracksDataException
-     * @throws Exception
-     */
-    public boolean resume() throws HyracksDataException;
-
-    /**
-     * Discontinue the ingestion of data.
-     * @throws Exception
-     */
-    public boolean stop() throws Exception;
-
-    /**
-     * @param e
-     * @return true if the ingestion should continue post the exception else false
-     * @throws Exception
-     */
-    public boolean handleException(Throwable e);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
index fe30b38..accd730 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.asterix.external.indexing.ExternalFile;
 
 public interface IIndexibleExternalDataSource extends IExternalDataSourceFactory {
-    public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception;
+    public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
 
     /**
      * Specify whether the external data source is configured for indexing

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
index ed5e7b5..c247ef6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
@@ -20,6 +20,4 @@ package org.apache.asterix.external.api;
 
 public interface IIndexingDatasource {
     public IExternalIndexer getIndexer();
-
-    public void setIndexer(IExternalIndexer indexer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
index 8cc4e27..b10452e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
@@ -18,15 +18,12 @@
  */
 package org.apache.asterix.external.api;
 
-import java.util.Map;
-
 import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IInputStreamProvider {
-    public AInputStream getInputStream() throws Exception;
-
-    public void configure(Map<String, String> configuration);
+    public AInputStream getInputStream() throws HyracksDataException;
 
     public void setFeedLogManager(FeedLogManager feedLogManager);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
index 3cc31dc..f52f7d3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
@@ -19,8 +19,15 @@
 package org.apache.asterix.external.api;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IInputStreamProviderFactory extends IExternalDataSourceFactory {
 
-    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception;
+    public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException;
+
+    @Override
+    public default DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
new file mode 100644
index 0000000..0f5ada4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+
+public interface IRecordConverter<I, O> {
+
+    public O convert(IRawRecord<? extends I> input) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 3cb8f37..bc97ed0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -29,9 +29,4 @@ public interface IRecordDataParser<T> extends IDataParser {
      * @throws Exception
      */
     public void parse(IRawRecord<? extends T> record, DataOutput out) throws IOException;
-
-    /**
-     * @return the record class
-     */
-    public Class<? extends T> getRecordClass();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
index 993d947..2ddbbcd 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
@@ -18,15 +18,17 @@
  */
 package org.apache.asterix.external.api;
 
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordDataParserFactory<T> extends IDataParserFactory {
-    public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx)
-            throws HyracksDataException, AsterixException, IOException;
+    public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException;
+
+    public Class<?> getRecordClass();
 
-    public Class<? extends T> getRecordClass();
+    @Override
+    public default DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 769db19..b4d67d4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.api;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.asterix.external.util.FeedLogManager;
 
@@ -31,15 +30,6 @@ import org.apache.asterix.external.util.FeedLogManager;
 public interface IRecordReader<T> extends Closeable {
 
     /**
-     * Configure the reader with the set of key/value pairs passed by the compiler
-     * @param configuration
-     *        the set of key/value pairs
-     * @throws Exception
-     *         when the reader can't be configured (i,e. due to incorrect configuration, unreachable source, etc.)
-     */
-    public void configure(Map<String, String> configuration) throws Exception;
-
-    /**
      * @return true if the reader has more records remaining, false, otherwise.
      * @throws Exception
      *         if an error takes place

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index fdc54d6..c6adbc4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -19,10 +19,17 @@
 package org.apache.asterix.external.api;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
 
-    public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception;
+    public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException;
 
-    public Class<? extends T> getRecordClass();
+    public Class<?> getRecordClass();
+
+    @Override
+    public default DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
new file mode 100644
index 0000000..23c5bdd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithMetaDataAndPKParser<T> extends IRecordDataParser<RecordWithMetadataAndPK<T>> {
+
+    public void parseMeta(RecordWithMetadataAndPK<? extends T> record, DataOutput out) throws IOException;
+
+    public void appendKeys(RecordWithMetadataAndPK<T> record, ArrayTupleBuilder tb) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
new file mode 100644
index 0000000..4b97e8d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IRecordWithMetaDataParser<T> extends IRecordDataParser<T> {
+    public void parseMeta(DataOutput out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
new file mode 100644
index 0000000..e6c114d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithPKDataParser<T> extends IRecordDataParser<T> {
+
+    public void appendKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
index f596efa..ca274e8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
 public interface IStreamDataParser extends IDataParser {
     /**
      * Sets the inputStream for the parser. called only for parsers that support InputStreams
+     *
+     * @throws IOException
      */
     public void setInputStream(InputStream in) throws IOException;
 
@@ -34,6 +36,7 @@ public interface IStreamDataParser extends IDataParser {
      *
      * @param out
      *            DataOutput instance that for writing the parser output.
+     * @throws IOException
      */
     public boolean parse(DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
index 828f71e..ad9acc6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -18,12 +18,11 @@
  */
 package org.apache.asterix.external.api;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IStreamDataParserFactory extends IDataParserFactory {
 
     public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
-            throws HyracksDataException, AsterixException;
+            throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
deleted file mode 100644
index d368c48..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-public interface IStreamFlowController extends IDataFlowController {
-    public void setStreamParser(IStreamDataParser dataParser);
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
index c0add02..22d0d6b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.api;
 
-import java.util.Map;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -36,8 +34,6 @@ public interface ITupleForwarder {
         FEED
     }
 
-    public void configure(Map<String, String> configuration) throws HyracksDataException;
-
     public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index a5aaac4..bbd93c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -18,37 +18,17 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.util.Map;
-
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractDataFlowController implements IDataFlowController {
 
-    protected ITupleForwarder tupleForwarder;
-    protected IHyracksTaskContext ctx;
-    protected Map<String, String> configuration;
+    protected final ITupleForwarder tupleForwarder;
+    protected final IHyracksTaskContext ctx;
 
-    @Override
-    public ITupleForwarder getTupleForwarder() {
-        return tupleForwarder;
-    }
-
-    @Override
-    public void setTupleForwarder(ITupleForwarder tupleForwarder) {
-        this.tupleForwarder = tupleForwarder;
-    }
-
-    protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
-        tupleForwarder.initialize(ctx, writer);
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
-        this.configuration = configuration;
+    public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder) {
         this.ctx = ctx;
+        this.tupleForwarder = tupleForwarder;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 0c58ee3..cf4ed19 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,50 +18,28 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.util.Map;
-
-
 import javax.annotation.Nonnull;
 
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public abstract class AbstractFeedDataFlowController implements IDataFlowController {
-    protected FeedTupleForwarder tupleForwarder;
-    protected IHyracksTaskContext ctx;
-    protected Map<String, String> configuration;
-    protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
-    protected ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
-    protected FeedLogManager feedLogManager;
+    protected final FeedTupleForwarder tupleForwarder;
+    protected final IHyracksTaskContext ctx;
+    protected final int numOfFields;
+    protected final ArrayTupleBuilder tb;
+    protected final FeedLogManager feedLogManager;
 
-    public AbstractFeedDataFlowController(@Nonnull FeedLogManager feedLogManager) {
+    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+            @Nonnull FeedLogManager feedLogManager, int numOfFields) {
         this.feedLogManager = feedLogManager;
-    }
-
-    @Override
-    public ITupleForwarder getTupleForwarder() {
-        return tupleForwarder;
-    }
-
-    @Override
-    public void setTupleForwarder(ITupleForwarder tupleForwarder) {
-        this.tupleForwarder = (FeedTupleForwarder) tupleForwarder;
-    }
-
-    protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
-        tupleForwarder.configure(configuration);
-        tupleForwarder.initialize(ctx, writer);
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
-        this.configuration = configuration;
+        this.numOfFields = numOfFields;
         this.ctx = ctx;
+        this.tupleForwarder = tupleForwarder;
+        this.tb = new ArrayTupleBuilder(numOfFields);
     }
 
     @Override
@@ -76,7 +54,13 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
         return true;
     }
 
+    @Override
     public void flush() throws HyracksDataException {
         tupleForwarder.flush();
     }
+
+    @Override
+    public abstract boolean stop() throws HyracksDataException;
+
+    public abstract boolean handleException(Throwable th);
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
new file mode 100644
index 0000000..8ec422f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordWithPKDataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
+
+    private final IRecordWithPKDataParser<T> dataParser;
+
+    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+            final FeedLogManager feedLogManager, final int numOfOutputFields,
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        this.dataParser = dataParser;
+    }
+
+    @Override
+    protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+        dataParser.appendKeys(tb, record);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
new file mode 100644
index 0000000..370eec0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
+
+    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+            final FeedLogManager feedLogManager, final int numOfOutputFields,
+            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+    }
+
+    @Override
+    protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+        dataParser.appendPK(tb);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 93f866c..db95a6a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -52,20 +52,27 @@ public class CounterTimerTupleForwarder implements ITupleForwarder {
     private Object lock = new Object();
     private boolean activeTimer = false;
 
-    @Override
-    public void configure(Map<String, String> configuration) {
+    private CounterTimerTupleForwarder(int batchSize, long batchInterval) {
+        this.batchSize = batchSize;
+        this.batchInterval = batchInterval;
+        if (batchInterval > 0L) {
+            activeTimer = true;
+        }
+    }
+
+    // Factory method
+    public static CounterTimerTupleForwarder create(Map<String, String> configuration) {
+        int batchSize = -1;
+        long batchInterval = 0L;
         String propValue = configuration.get(BATCH_SIZE);
         if (propValue != null) {
             batchSize = Integer.parseInt(propValue);
-        } else {
-            batchSize = -1;
         }
-
         propValue = configuration.get(BATCH_INTERVAL);
         if (propValue != null) {
             batchInterval = Long.parseLong(propValue);
-            activeTimer = true;
         }
+        return new CounterTimerTupleForwarder(batchSize, batchInterval);
     }
 
     @Override
@@ -152,6 +159,5 @@ public class CounterTimerTupleForwarder implements ITupleForwarder {
                 e.printStackTrace();
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 3408af9..2cc3c66 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
@@ -29,7 +30,9 @@ import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -37,12 +40,13 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     protected final IRecordDataParser<T> dataParser;
     protected final IRecordReader<? extends T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
-    protected long interval;
+    protected final long interval = 1000;
     protected boolean failed = false;
 
-    public FeedRecordDataFlowController(@Nonnull FeedLogManager feedLogManager,
-            @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<T> recordReader) {
-        super(feedLogManager);
+    public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+            @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
+            @Nonnull IRecordReader<T> recordReader) {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         recordReader.setFeedLogManager(feedLogManager);
@@ -54,7 +58,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         HyracksDataException hde = null;
         try {
             failed = false;
-            initializeTupleForwarder(writer);
+            tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 if (record == null) {
@@ -65,6 +69,8 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                 tb.reset();
                 dataParser.parse(record, tb.getDataOutput());
                 tb.addFieldEndOffset();
+                addMetaPart(tb, record);
+                addPrimaryKeys(tb, record);
                 if (tb.getSize() > tupleForwarder.getMaxRecordSize()) {
                     // log
                     feedLogManager.logRecord(record.toString(), ExternalDataConstants.LARGE_RECORD_ERROR_MESSAGE);
@@ -96,6 +102,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         }
     }
 
+    protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+    }
+
+    protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+    }
+
     private void closeSignal() {
         synchronized (closed) {
             closed.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 580e350..f233971 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -19,25 +19,28 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.IStreamFlowController;
 import org.apache.asterix.external.input.stream.AInputStream;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedStreamDataFlowController extends AbstractFeedDataFlowController implements IStreamFlowController {
+public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
 
-    private IStreamDataParser dataParser;
-    private AInputStream stream;
+    private final IStreamDataParser dataParser;
+    private final AInputStream stream;
 
-    public FeedStreamDataFlowController(FeedLogManager feedLogManager) {
-        super(feedLogManager);
+    public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+            FeedLogManager feedLogManager, int numOfFields, IStreamDataParser streamParser, AInputStream inputStream) {
+        super(ctx, tupleForwarder, feedLogManager, numOfFields);
+        this.dataParser = streamParser;
+        this.stream = inputStream;
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
-            initializeTupleForwarder(writer);
+            tupleForwarder.initialize(ctx, writer);
             while (true) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
@@ -80,13 +83,4 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController
         }
         return handled;
     }
-
-    @Override
-    public void setStreamParser(IStreamDataParser dataParser) {
-        this.dataParser = dataParser;
-    }
-
-    public void setStream(AInputStream stream) {
-        this.stream = stream;
-    }
 }