You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/03/16 00:36:28 UTC
[12/19] incubator-asterixdb git commit: Support Change Feeds and
Ingestion of Records with MetaData
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm b/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
new file mode 100644
index 0000000..8ba1df3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.1.adm
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data and test ingestion of records
+ * Expected Res : Success
+ * Date : 24th Feb 2016
+ */
+
+drop dataverse Couchbase if exists;
+create dataverse Couchbase;
+use dataverse Couchbase;
+
+create type DocumentType as open{
+};
+
+create type CouchbaseMetaType as open{
+id:string,
+flags:int32,
+expiration:int32,
+cas:int64,
+rev:int64,
+vbid:int32,
+dtype:int32,
+};
+
+create dataset CouchFeedDataset(DocumentType) with meta(CouchbaseMetaType)primary key meta()."key";
+
+create feed CouchFeedWithMeta using FeedAdapter(
+ ("type-name"="DocumentType"),
+ ("meta-type-name"="CouchbaseMetaType"),
+ ("reader"="csv-with-record"),
+ ("path"="..."),
+ ("format"="record-with-meta")
+);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index d50a0a6..e6c9d53 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -29,25 +29,29 @@
QueryOffsetPath="queries"
QueryFileExtension=".aql">
<test-group name="feeds">
- <!-- Fails constantly and not clear what is intended
<test-case FilePath="feeds">
- <compilation-unit name="feeds_06">
- <output-dir compare="Text">feeds_06</output-dir>
- </compilation-unit>
- </test-case> -->
+ <compilation-unit name="change-feed-with-meta-pk-in-meta">
+ <output-dir compare="Text">change-feed-with-meta-pk-in-meta</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="feeds">
<compilation-unit name="feed-with-filtered-dataset">
<output-dir compare="Text">feed-with-filtered-dataset</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
- <compilation-unit name="feed-push-socket">
- <output-dir compare="Text">feed-push-socket</output-dir>
+ <compilation-unit name="change-feed">
+ <output-dir compare="Text">change-feed</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
- <compilation-unit name="drop-dataverse-with-disconnected-feed">
- <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+ <compilation-unit name="feed-with-meta-pk-in-meta">
+ <output-dir compare="Text">feed-with-meta-pk-in-meta</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feeds_07">
+ <output-dir compare="Text">feeds_07</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
@@ -55,9 +59,20 @@
<output-dir compare="Text">feed-with-external-parser</output-dir>
</compilation-unit>
</test-case>
+ <!-- Fails constantly and not clear what is intended
<test-case FilePath="feeds">
- <compilation-unit name="feeds_07">
- <output-dir compare="Text">feeds_07</output-dir>
+ <compilation-unit name="feeds_06">
+ <output-dir compare="Text">feeds_06</output-dir>
+ </compilation-unit>
+ </test-case> -->
+ <test-case FilePath="feeds">
+ <compilation-unit name="drop-dataverse-with-disconnected-feed">
+ <output-dir compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feed-push-socket">
+ <output-dir compare="Text">feed-push-socket</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
@@ -115,12 +130,63 @@
<output-dir compare="Text">issue_230_feeds</output-dir>
</compilation-unit>
</test-case>
-<!--
- <test-case FilePath="feeds">
+ <!--<test-case FilePath="feeds">
<compilation-unit name="issue_711_feeds">
<output-dir compare="Text">issue_711_feeds</output-dir>
</compilation-unit>
- </test-case> -->
+ </test-case>-->
+ </test-group>
+ <test-group name="upsert">
+ <test-case FilePath="upsert">
+ <compilation-unit name="filtered-dataset">
+ <output-dir compare="Text">filtered-dataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="nested-index">
+ <output-dir compare="Text">nested-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-rtree">
+ <output-dir compare="Text">primary-secondary-rtree</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="upsert-with-self-read">
+ <output-dir compare="Text">upsert-with-self-read</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="nullable-index">
+ <output-dir compare="Text">nullable-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="open-index">
+ <output-dir compare="Text">open-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-index">
+ <output-dir compare="Text">primary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-btree">
+ <output-dir compare="Text">primary-secondary-btree</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="primary-secondary-inverted">
+ <output-dir compare="Text">primary-secondary-inverted</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="upsert">
+ <compilation-unit name="multiple-secondaries">
+ <output-dir compare="Text">multiple-secondaries</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="external-library">
<test-case FilePath="external-library">
@@ -1320,60 +1386,26 @@
</test-case>
-->
</test-group>
- <test-group name="upsert">
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-rtree">
- <output-dir compare="Text">primary-secondary-rtree</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="upsert-with-self-read">
- <output-dir compare="Text">upsert-with-self-read</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="filtered-dataset">
- <output-dir compare="Text">filtered-dataset</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="nullable-index">
- <output-dir compare="Text">nullable-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="nested-index">
- <output-dir compare="Text">nested-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="open-index">
- <output-dir compare="Text">open-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-index">
- <output-dir compare="Text">primary-index</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-btree">
- <output-dir compare="Text">primary-secondary-btree</output-dir>
+ <test-group name="dml">
+ <test-case FilePath="dml">
+ <compilation-unit name="insert-dataset-with-meta">
+ <output-dir compare="Text">insert-dataset-with-meta</output-dir>
+ <expected-error>insert into dataset is not supported on Datasets with Meta records</expected-error>
</compilation-unit>
</test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="primary-secondary-inverted">
- <output-dir compare="Text">primary-secondary-inverted</output-dir>
+ <test-case FilePath="dml">
+ <compilation-unit name="delete-dataset-with-meta">
+ <output-dir compare="Text">delete-dataset-with-meta</output-dir>
+ <expected-error>delete from dataset is not supported on Datasets with Meta records</expected-error>
</compilation-unit>
</test-case>
- <test-case FilePath="upsert">
- <compilation-unit name="multiple-secondaries">
- <output-dir compare="Text">multiple-secondaries</output-dir>
+ <test-case FilePath="dml">
+ <compilation-unit name="upsert-dataset-with-meta">
+ <output-dir compare="Text">upsert-dataset-with-meta</output-dir>
+ <expected-error>upsert into dataset is not supported on Datasets with Meta records</expected-error>
</compilation-unit>
</test-case>
- </test-group>
- <test-group name="dml">
- <test-case FilePath="dml">
+ <test-case FilePath="dml">
<compilation-unit name="load-with-ngram-index">
<output-dir compare="Text">load-with-ngram-index</output-dir>
</compilation-unit>
@@ -6120,6 +6152,12 @@
</test-group>
<test-group name="load">
<test-case FilePath="load">
+ <compilation-unit name="dataset-with-meta">
+ <output-dir compare="Text">dataset-with-meta</output-dir>
+ <expected-error>load dataset is not supported on Datasets with Meta records</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="load">
<compilation-unit name="csv_01">
<output-dir compare="Text">csv_01</output-dir>
</compilation-unit>
@@ -6178,13 +6216,13 @@
<test-case FilePath="load">
<compilation-unit name="issue14_query">
<output-dir compare="Text">issue14_query</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+ <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
<compilation-unit name="issue315_query">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+ <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2d994df..23215db 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5989,13 +5989,13 @@
<test-case FilePath="load">
<compilation-unit name="issue14_query">
<output-dir compare="Text">issue14_query</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+ <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
<compilation-unit name="issue315_query">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.asterix.common.exceptions.AsterixException: Unspecified ("reader" or "format") parameter for local filesystem adapter</expected-error>
+ <expected-error>org.apache.asterix.common.exceptions.AsterixException: The parameter parser must be specified</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="load">
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 77a290d..8ceee62 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -45,7 +45,7 @@ import org.apache.asterix.common.configuration.TransactionLogDir;
import org.apache.asterix.common.exceptions.AsterixException;
public class AsterixPropertiesAccessor {
- private static final Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
+ private static Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
private final String instanceName;
private final String metadataNodeName;
@@ -56,7 +56,7 @@ public class AsterixPropertiesAccessor {
private final Map<String, String> transactionLogDirs;
private final Map<String, String> asterixBuildProperties;
private final Map<String, ClusterPartition[]> nodePartitionsMap;
- private SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
public AsterixPropertiesAccessor() throws AsterixException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
index ba32af2..947d7d7 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/server/FileTestServer.java
@@ -65,6 +65,7 @@ public class FileTestServer implements ITestServer {
// This also could be due to the close() call
}
}
+
}
});
listenerThread.start();
@@ -72,9 +73,11 @@ public class FileTestServer implements ITestServer {
@Override
public void stop() throws IOException, InterruptedException {
- serverSocket.close();
- if (listenerThread.isAlive()) {
- listenerThread.join();
+ if (serverSocket.isBound()) {
+ serverSocket.close();
+ if (listenerThread.isAlive()) {
+ listenerThread.join();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 851acd4..a03ad1a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.adapter.factory;
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
@@ -28,6 +29,8 @@ import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IIndexibleExternalDataSource;
import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.provider.DataflowControllerProvider;
@@ -40,6 +43,7 @@ import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.std.file.FileSplit;
public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
@@ -53,6 +57,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
private boolean indexingOp;
private boolean isFeed;
private FileSplit[] feedLogFileSplits;
+ private ARecordType metaType;
@Override
public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -66,7 +71,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception {
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException {
return dataSourceFactory.getPartitionConstraint();
}
@@ -74,14 +79,23 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
* Runs on each node controller (after serialization-deserialization)
*/
@Override
- public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- restoreExternalObjects();
+ public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ restoreExternalObjects();
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
- return new GenericAdapter(controller);
+ if (isFeed) {
+ return new FeedAdapter((AbstractFeedDataFlowController) controller);
+ } else {
+ return new GenericAdapter(controller);
+ }
}
- private void restoreExternalObjects() throws Exception {
+ private void restoreExternalObjects() throws AsterixException {
if (dataSourceFactory == null) {
dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
// create and configure parser factory
@@ -94,15 +108,19 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
// create and configure parser factory
dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
dataParserFactory.setRecordType(recordType);
+ dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
}
}
@Override
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+ throws AsterixException {
this.recordType = outputType;
+ this.metaType = metaType;
this.configuration = configuration;
dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
+
dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
prepare();
ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory, dataParserFactory);
@@ -110,7 +128,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
nullifyExternalObjects();
}
- private void configureFeedLogManager() throws Exception {
+ private void configureFeedLogManager() throws AsterixException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
@@ -127,12 +145,13 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
}
}
- private void prepare() throws Exception {
+ private void prepare() throws AsterixException {
if (dataSourceFactory.isIndexible() && (files != null)) {
((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
}
dataSourceFactory.configure(configuration);
dataParserFactory.setRecordType(recordType);
+ dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index f149ed3..49c5943 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.adapter.factory;
import java.io.Serializable;
import java.util.Map;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.ILookupReaderFactory;
import org.apache.asterix.external.api.ILookupRecordReader;
import org.apache.asterix.external.api.IRecordDataParser;
@@ -44,12 +45,12 @@ public class LookupAdapterFactory<T> implements Serializable {
private static final long serialVersionUID = 1L;
private IRecordDataParserFactory dataParserFactory;
private ILookupReaderFactory readerFactory;
- private ARecordType recordType;
- private int[] ridFields;
+ private final ARecordType recordType;
+ private final int[] ridFields;
private Map<String, String> configuration;
- private boolean retainInput;
- private boolean retainNull;
- private INullWriterFactory iNullWriterFactory;
+ private final boolean retainInput;
+ private final boolean retainNull;
+ private final INullWriterFactory iNullWriterFactory;
public LookupAdapterFactory(ARecordType recordType, int[] ridFields, boolean retainInput, boolean retainNull,
INullWriterFactory iNullWriterFactory) {
@@ -64,7 +65,6 @@ public class LookupAdapterFactory<T> implements Serializable {
ExternalFileIndexAccessor snapshotAccessor, IFrameWriter writer) throws HyracksDataException {
try {
IRecordDataParser<T> dataParser = dataParserFactory.createRecordParser(ctx);
- dataParser.configure(configuration, recordType);
ILookupRecordReader<? extends T> reader = readerFactory.createRecordReader(ctx, partition,
snapshotAccessor);
reader.configure(configuration);
@@ -76,7 +76,7 @@ public class LookupAdapterFactory<T> implements Serializable {
}
}
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws AsterixException {
this.configuration = configuration;
readerFactory = LookupReaderFactoryProvider.getLookupReaderFactory(configuration);
dataParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3965e5e..59a7514 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -21,9 +21,11 @@ package org.apache.asterix.external.api;
import java.io.Serializable;
import java.util.Map;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
@@ -50,7 +52,7 @@ public interface IAdapterFactory extends Serializable {
* In the former case, the IP address is translated to a node controller id
* running on the node with the given IP address.
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
/**
* Creates an instance of IDatasourceAdapter.
@@ -60,14 +62,21 @@ public interface IAdapterFactory extends Serializable {
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
+ public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
/**
* @param configuration
* @param outputType
+ * @param metaType
* @throws Exception
*/
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
+ public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+ throws AsterixException;
+
+ public default void configure(final Map<String, String> configuration, final ARecordType outputType)
+ throws AsterixException {
+ configure(configuration, outputType, null);
+ }
/**
* Gets the record type associated with the output of the adapter
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
index 252b43b..e5b22e9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterRuntimeManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.api;
+import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
import org.apache.asterix.external.feed.management.FeedId;
@@ -57,14 +58,14 @@ public interface IAdapterRuntimeManager {
public void stop() throws Exception;
/**
- * @return feedId associated with the feed that is being ingested
+ * @return feedId associated with the feed that is being ingested.
*/
public FeedId getFeedId();
/**
- * @return the instance of the feed adapter (an implementation of {@code IFeedAdapter}) in use.
+ * @return an instance of the {@code FeedAdapter} in use.
*/
- public IFeedAdapter getFeedAdapter();
+ public FeedAdapter getFeedAdapter();
/**
* @return state associated with the AdapterRuntimeManager. See {@code State}.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index d9ed131..33f262a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,46 +18,27 @@
*/
package org.apache.asterix.external.api;
-import java.util.Map;
-
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IDataFlowController {
- /**
- * Order of calls:
- * 1. Constructor()
- * 2. if record flow controller
- * |-a. Set record reader
- * |-b. Set record parser
- * else
- * |-a. Set stream parser
- * 3. setTupleForwarder(forwarder)
- * 4. configure(configuration,ctx)
- * 5. start(writer)
- *
- * pause(), resume(), and stop() are only used with feeds
- * pause is called after start when a feed is running and the system is overwhelmed with data.
- * resume is called after the load goes down and we are ready to receive more data.
- * stop is called to disconnect the feed. once stop is called, no other method is called.
- *
- */
-
+ //TODO: Refactor this interface. Remove writer from start() signature
public void start(IFrameWriter writer) throws HyracksDataException;
- public boolean stop() throws HyracksDataException;
-
- public boolean pause() throws HyracksDataException;
-
- public boolean resume() throws HyracksDataException;
-
- public boolean handleException(Throwable th);
+ public default boolean pause() throws HyracksDataException {
+ throw new HyracksDataException("Method not implemented");
+ }
- public ITupleForwarder getTupleForwarder();
+ public default boolean resume() throws HyracksDataException {
+ throw new HyracksDataException("Method not implemented");
+ }
- public void setTupleForwarder(ITupleForwarder forwarder);
+ public default void flush() throws HyracksDataException {
+ throw new HyracksDataException("Method not implemented");
+ }
- public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) throws HyracksDataException;
+ public default boolean stop() throws HyracksDataException {
+ throw new HyracksDataException("Method not implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index e680822..322e51f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -19,15 +19,11 @@
package org.apache.asterix.external.api;
import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableOrderedList;
import org.apache.asterix.om.base.AMutableRecord;
@@ -43,24 +39,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public interface IDataParser {
- /**
- * @return The supported data sources
- */
- public DataSourceType getDataSourceType();
-
- /**
- * @param configuration
- * a set of configurations that comes from two sources.
- * 1. The create adapter statement.
- * 2. The query compiler.
- * @param recordType
- * The expected record type
- * @throws IOException
- */
- public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException;
-
/*
- * The following two static methods are expensive. right now, they are used by RSSFeeds and Twitter feed
+ * The following two static methods are expensive. right now, they are used by RSSFeeds and
+ * Twitter feed
* TODO: Get rid of them
*/
public static void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
index 5c3845c..1fc97c9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParserFactory.java
@@ -35,18 +35,27 @@ public interface IDataParserFactory extends Serializable {
* an instance of IDataParserFactory with STREAM data source type must implement IStreamDataParserFactory
* @throws AsterixException
*/
- public DataSourceType getDataSourceType() throws AsterixException;
+ public DataSourceType getDataSourceType();
/**
* Configure the data parser factory. The passed map contains key value pairs from the
* submitted AQL statement and any additional pairs added by the compiler
+ *
* @param configuration
*/
- public void configure(Map<String, String> configuration) throws Exception;
+ public void configure(Map<String, String> configuration) throws AsterixException;
/**
* Set the record type expected to be produced by parsers created by this factory
+ *
* @param recordType
*/
public void setRecordType(ARecordType recordType);
+
+ /**
+ * Set the meta record type expected to be produced by parsers created by this factory
+ *
+ * @param metaType
+ */
+ public void setMetaType(ARecordType metaType);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 1487cf1..b49a719 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -31,7 +32,6 @@ public interface IExternalDataSourceFactory extends Serializable {
/**
* The data source type indicates whether the data source produces a continuous stream or
* a set of records
- * @author amoudi
*/
public enum DataSourceType {
STREAM,
@@ -45,21 +45,24 @@ public interface IExternalDataSourceFactory extends Serializable {
/**
* Specifies on which locations this data source is expected to run.
+ *
* @return
- * @throws Exception
+ * @throws AsterixException
*/
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws Exception;
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AsterixException;
/**
* Configure the data parser factory. The passed map contains key value pairs from the
* submitted AQL statement and any additional pairs added by the compiler
+ *
* @param configuration
- * @throws Exception
+ * @throws AsterixException
*/
- public void configure(Map<String, String> configuration) throws Exception;
+ public void configure(Map<String, String> configuration) throws AsterixException;
/**
* Specify whether the external data source can be indexed
+ *
* @return
*/
public default boolean isIndexible() {
@@ -93,5 +96,4 @@ public interface IExternalDataSourceFactory extends Serializable {
}
return constraints;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
index 0b4277e..01ffd99 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalIndexer.java
@@ -18,36 +18,39 @@
*/
package org.apache.asterix.external.api;
+import java.io.IOException;
import java.io.Serializable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
/**
- * @author amoudi
- * This Interface represents the component responsible for adding record ids to tuples when indexing external data
+ * This Interface represents the component responsible for adding record IDs to tuples when indexing external data
*/
public interface IExternalIndexer extends Serializable {
/**
* This method is called by an indexible datasource when the external source reader have been updated.
* this gives a chance for the indexer to update its reader specific values (i,e. file name)
+ *
* @param reader
- * the new reader
+ * the new reader
* @throws Exception
*/
- public void reset(IRecordReader<?> reader) throws Exception;
+ public void reset(IRecordReader<?> reader) throws IOException;
/**
* This method is called by the dataflow controller with each tuple. the indexer is expected to append record ids to the tuple.
+ *
* @param tb
* @throws Exception
*/
- public void index(ArrayTupleBuilder tb) throws Exception;
+ public void index(ArrayTupleBuilder tb) throws IOException;
/**
* This method returns the number of fields in the record id. It is used by tuple appender at the initialization step.
+ *
* @return
* @throws Exception
*/
- public int getNumberOfFields() throws Exception;
+ public int getNumberOfFields() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
deleted file mode 100644
index 3261556..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedAdapter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IFeedAdapter extends IDataSourceAdapter {
- /**
- * Pause the ingestion of data.
- * @throws HyracksDataException
- * @throws Exception
- */
- public boolean pause() throws HyracksDataException;
-
- /**
- * Resume the ingestion of data.
- * @throws HyracksDataException
- * @throws Exception
- */
- public boolean resume() throws HyracksDataException;
-
- /**
- * Discontinue the ingestion of data.
- * @throws Exception
- */
- public boolean stop() throws Exception;
-
- /**
- * @param e
- * @return true if the ingestion should continue post the exception else false
- * @throws Exception
- */
- public boolean handleException(Throwable e);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
index fe30b38..accd730 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexibleExternalDataSource.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.asterix.external.indexing.ExternalFile;
public interface IIndexibleExternalDataSource extends IExternalDataSourceFactory {
- public void setSnapshot(List<ExternalFile> files, boolean indexingOp) throws Exception;
+ public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
/**
* Specify whether the external data source is configured for indexing
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
index ed5e7b5..c247ef6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingDatasource.java
@@ -20,6 +20,4 @@ package org.apache.asterix.external.api;
public interface IIndexingDatasource {
public IExternalIndexer getIndexer();
-
- public void setIndexer(IExternalIndexer indexer);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
index 8cc4e27..b10452e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProvider.java
@@ -18,15 +18,12 @@
*/
package org.apache.asterix.external.api;
-import java.util.Map;
-
import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IInputStreamProvider {
- public AInputStream getInputStream() throws Exception;
-
- public void configure(Map<String, String> configuration);
+ public AInputStream getInputStream() throws HyracksDataException;
public void setFeedLogManager(FeedLogManager feedLogManager);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
index 3cc31dc..f52f7d3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IInputStreamProviderFactory.java
@@ -19,8 +19,15 @@
package org.apache.asterix.external.api;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IInputStreamProviderFactory extends IExternalDataSourceFactory {
- public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition) throws Exception;
+ public IInputStreamProvider createInputStreamProvider(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException;
+
+ @Override
+ public default DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
new file mode 100644
index 0000000..0f5ada4
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+
+public interface IRecordConverter<I, O> {
+
+ public O convert(IRawRecord<? extends I> input) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
index 3cb8f37..bc97ed0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParser.java
@@ -29,9 +29,4 @@ public interface IRecordDataParser<T> extends IDataParser {
* @throws Exception
*/
public void parse(IRawRecord<? extends T> record, DataOutput out) throws IOException;
-
- /**
- * @return the record class
- */
- public Class<? extends T> getRecordClass();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
index 993d947..2ddbbcd 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordDataParserFactory.java
@@ -18,15 +18,17 @@
*/
package org.apache.asterix.external.api;
-import java.io.IOException;
-
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IRecordDataParserFactory<T> extends IDataParserFactory {
- public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx)
- throws HyracksDataException, AsterixException, IOException;
+ public IRecordDataParser<T> createRecordParser(IHyracksTaskContext ctx) throws HyracksDataException;
+
+ public Class<?> getRecordClass();
- public Class<? extends T> getRecordClass();
+ @Override
+ public default DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 769db19..b4d67d4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -20,7 +20,6 @@ package org.apache.asterix.external.api;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Map;
import org.apache.asterix.external.util.FeedLogManager;
@@ -31,15 +30,6 @@ import org.apache.asterix.external.util.FeedLogManager;
public interface IRecordReader<T> extends Closeable {
/**
- * Configure the reader with the set of key/value pairs passed by the compiler
- * @param configuration
- * the set of key/value pairs
- * @throws Exception
- * when the reader can't be configured (i,e. due to incorrect configuration, unreachable source, etc.)
- */
- public void configure(Map<String, String> configuration) throws Exception;
-
- /**
* @return true if the reader has more records remaining, false, otherwise.
* @throws Exception
* if an error takes place
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
index fdc54d6..c6adbc4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReaderFactory.java
@@ -19,10 +19,17 @@
package org.apache.asterix.external.api;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IRecordReaderFactory<T> extends IExternalDataSourceFactory {
- public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition) throws Exception;
+ public IRecordReader<? extends T> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException;
- public Class<? extends T> getRecordClass();
+ public Class<?> getRecordClass();
+
+ @Override
+ public default DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
new file mode 100644
index 0000000..23c5bdd
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataAndPKParser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithMetaDataAndPKParser<T> extends IRecordDataParser<RecordWithMetadataAndPK<T>> {
+
+ public void parseMeta(RecordWithMetadataAndPK<? extends T> record, DataOutput out) throws IOException;
+
+ public void appendKeys(RecordWithMetadataAndPK<T> record, ArrayTupleBuilder tb) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
new file mode 100644
index 0000000..4b97e8d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IRecordWithMetaDataParser<T> extends IRecordDataParser<T> {
+ public void parseMeta(DataOutput out) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
new file mode 100644
index 0000000..e6c114d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithPKDataParser.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithPKDataParser<T> extends IRecordDataParser<T> {
+
+ public void appendKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
index f596efa..ca274e8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParser.java
@@ -25,6 +25,8 @@ import java.io.InputStream;
public interface IStreamDataParser extends IDataParser {
/**
* Sets the inputStream for the parser. called only for parsers that support InputStreams
+ *
+ * @throws IOException
*/
public void setInputStream(InputStream in) throws IOException;
@@ -34,6 +36,7 @@ public interface IStreamDataParser extends IDataParser {
*
* @param out
* DataOutput instance that for writing the parser output.
+ * @throws IOException
*/
public boolean parse(DataOutput out) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
index 828f71e..ad9acc6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamDataParserFactory.java
@@ -18,12 +18,11 @@
*/
package org.apache.asterix.external.api;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IStreamDataParserFactory extends IDataParserFactory {
public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, int partition)
- throws HyracksDataException, AsterixException;
+ throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
deleted file mode 100644
index d368c48..0000000
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamFlowController.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-public interface IStreamFlowController extends IDataFlowController {
- public void setStreamParser(IStreamDataParser dataParser);
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
index c0add02..22d0d6b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.api;
-import java.util.Map;
-
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -36,8 +34,6 @@ public interface ITupleForwarder {
FEED
}
- public void configure(Map<String, String> configuration) throws HyracksDataException;
-
public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index a5aaac4..bbd93c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -18,37 +18,17 @@
*/
package org.apache.asterix.external.dataflow;
-import java.util.Map;
-
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AbstractDataFlowController implements IDataFlowController {
- protected ITupleForwarder tupleForwarder;
- protected IHyracksTaskContext ctx;
- protected Map<String, String> configuration;
+ protected final ITupleForwarder tupleForwarder;
+ protected final IHyracksTaskContext ctx;
- @Override
- public ITupleForwarder getTupleForwarder() {
- return tupleForwarder;
- }
-
- @Override
- public void setTupleForwarder(ITupleForwarder tupleForwarder) {
- this.tupleForwarder = tupleForwarder;
- }
-
- protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
- tupleForwarder.initialize(ctx, writer);
- }
-
- @Override
- public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
- this.configuration = configuration;
+ public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder) {
this.ctx = ctx;
+ this.tupleForwarder = tupleForwarder;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 0c58ee3..cf4ed19 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,50 +18,28 @@
*/
package org.apache.asterix.external.dataflow;
-import java.util.Map;
-
-
import javax.annotation.Nonnull;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public abstract class AbstractFeedDataFlowController implements IDataFlowController {
- protected FeedTupleForwarder tupleForwarder;
- protected IHyracksTaskContext ctx;
- protected Map<String, String> configuration;
- protected static final int NUMBER_OF_TUPLE_FIELDS = 1;
- protected ArrayTupleBuilder tb = new ArrayTupleBuilder(NUMBER_OF_TUPLE_FIELDS);
- protected FeedLogManager feedLogManager;
+ protected final FeedTupleForwarder tupleForwarder;
+ protected final IHyracksTaskContext ctx;
+ protected final int numOfFields;
+ protected final ArrayTupleBuilder tb;
+ protected final FeedLogManager feedLogManager;
- public AbstractFeedDataFlowController(@Nonnull FeedLogManager feedLogManager) {
+ public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+ @Nonnull FeedLogManager feedLogManager, int numOfFields) {
this.feedLogManager = feedLogManager;
- }
-
- @Override
- public ITupleForwarder getTupleForwarder() {
- return tupleForwarder;
- }
-
- @Override
- public void setTupleForwarder(ITupleForwarder tupleForwarder) {
- this.tupleForwarder = (FeedTupleForwarder) tupleForwarder;
- }
-
- protected void initializeTupleForwarder(IFrameWriter writer) throws HyracksDataException {
- tupleForwarder.configure(configuration);
- tupleForwarder.initialize(ctx, writer);
- }
-
- @Override
- public void configure(Map<String, String> configuration, IHyracksTaskContext ctx) {
- this.configuration = configuration;
+ this.numOfFields = numOfFields;
this.ctx = ctx;
+ this.tupleForwarder = tupleForwarder;
+ this.tb = new ArrayTupleBuilder(numOfFields);
}
@Override
@@ -76,7 +54,13 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
return true;
}
+ @Override
public void flush() throws HyracksDataException {
tupleForwarder.flush();
}
+
+ @Override
+ public abstract boolean stop() throws HyracksDataException;
+
+ public abstract boolean handleException(Throwable th);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
new file mode 100644
index 0000000..8ec422f
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordWithPKDataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
+
+ private final IRecordWithPKDataParser<T> dataParser;
+
+ public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+ final FeedLogManager feedLogManager, final int numOfOutputFields,
+ final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ this.dataParser = dataParser;
+ }
+
+ @Override
+ protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+ dataParser.appendKeys(tb, record);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
new file mode 100644
index 0000000..370eec0
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
+
+ public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
+ final FeedLogManager feedLogManager, final int numOfOutputFields,
+ final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ }
+
+ @Override
+ protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
+ dataParser.appendPK(tb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 93f866c..db95a6a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -52,20 +52,27 @@ public class CounterTimerTupleForwarder implements ITupleForwarder {
private Object lock = new Object();
private boolean activeTimer = false;
- @Override
- public void configure(Map<String, String> configuration) {
+ private CounterTimerTupleForwarder(int batchSize, long batchInterval) {
+ this.batchSize = batchSize;
+ this.batchInterval = batchInterval;
+ if (batchInterval > 0L) {
+ activeTimer = true;
+ }
+ }
+
+ // Factory method
+ public static CounterTimerTupleForwarder create(Map<String, String> configuration) {
+ int batchSize = -1;
+ long batchInterval = 0L;
String propValue = configuration.get(BATCH_SIZE);
if (propValue != null) {
batchSize = Integer.parseInt(propValue);
- } else {
- batchSize = -1;
}
-
propValue = configuration.get(BATCH_INTERVAL);
if (propValue != null) {
batchInterval = Long.parseLong(propValue);
- activeTimer = true;
}
+ return new CounterTimerTupleForwarder(batchSize, batchInterval);
}
@Override
@@ -152,6 +159,5 @@ public class CounterTimerTupleForwarder implements ITupleForwarder {
e.printStackTrace();
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 3408af9..2cc3c66 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.dataflow;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
@@ -29,7 +30,9 @@ import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.log4j.Logger;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -37,12 +40,13 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
protected final IRecordDataParser<T> dataParser;
protected final IRecordReader<? extends T> recordReader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
- protected long interval;
+ protected final long interval = 1000;
protected boolean failed = false;
- public FeedRecordDataFlowController(@Nonnull FeedLogManager feedLogManager,
- @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<T> recordReader) {
- super(feedLogManager);
+ public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+ @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
+ @Nonnull IRecordReader<T> recordReader) {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
recordReader.setFeedLogManager(feedLogManager);
@@ -54,7 +58,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
HyracksDataException hde = null;
try {
failed = false;
- initializeTupleForwarder(writer);
+ tupleForwarder.initialize(ctx, writer);
while (recordReader.hasNext()) {
IRawRecord<? extends T> record = recordReader.next();
if (record == null) {
@@ -65,6 +69,8 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
tb.reset();
dataParser.parse(record, tb.getDataOutput());
tb.addFieldEndOffset();
+ addMetaPart(tb, record);
+ addPrimaryKeys(tb, record);
if (tb.getSize() > tupleForwarder.getMaxRecordSize()) {
// log
feedLogManager.logRecord(record.toString(), ExternalDataConstants.LARGE_RECORD_ERROR_MESSAGE);
@@ -96,6 +102,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
}
}
+ protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+ }
+
+ protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
+ }
+
private void closeSignal() {
synchronized (closed) {
closed.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/d3338f66/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 580e350..f233971 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -19,25 +19,28 @@
package org.apache.asterix.external.dataflow;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.IStreamFlowController;
import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class FeedStreamDataFlowController extends AbstractFeedDataFlowController implements IStreamFlowController {
+public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
- private IStreamDataParser dataParser;
- private AInputStream stream;
+ private final IStreamDataParser dataParser;
+ private final AInputStream stream;
- public FeedStreamDataFlowController(FeedLogManager feedLogManager) {
- super(feedLogManager);
+ public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
+ FeedLogManager feedLogManager, int numOfFields, IStreamDataParser streamParser, AInputStream inputStream) {
+ super(ctx, tupleForwarder, feedLogManager, numOfFields);
+ this.dataParser = streamParser;
+ this.stream = inputStream;
}
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
try {
- initializeTupleForwarder(writer);
+ tupleForwarder.initialize(ctx, writer);
while (true) {
tb.reset();
if (!dataParser.parse(tb.getDataOutput())) {
@@ -80,13 +83,4 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController
}
return handled;
}
-
- @Override
- public void setStreamParser(IStreamDataParser dataParser) {
- this.dataParser = dataParser;
- }
-
- public void setStream(AInputStream stream) {
- this.stream = stream;
- }
}