You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2016/10/15 18:02:02 UTC
[8/8] asterixdb git commit: Move Twitter Firehose Datasource to Test
Source Folder
Move Twitter Firehose Datasource to Test Source Folder
Change-Id: Iefe2130707012b8ce60f5dfac96635a1a515a076
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1290
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/cb92dad7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/cb92dad7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/cb92dad7
Branch: refs/heads/master
Commit: cb92dad732fb6094c70bfbd75cbfabe7352c3923
Parents: 9ddf5e0
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat Oct 15 08:50:24 2016 -0700
Committer: Michael Blow <mb...@apache.org>
Committed: Sat Oct 15 11:01:05 2016 -0700
----------------------------------------------------------------------
.../queries/feeds/feeds_07/feeds_07.1.ddl.aql | 4 +-
.../queries/feeds/feeds_08/feeds_08.1.ddl.aql | 3 +-
.../queries/feeds/feeds_09/feeds_09.1.ddl.aql | 4 +-
.../feeds/feeds_07/feeds_07.1.ddl.sqlpp | 4 +-
.../feeds/feeds_08/feeds_08.1.ddl.sqlpp | 3 +-
.../feeds/feeds_09/feeds_09.1.ddl.sqlpp | 4 +-
asterixdb/asterix-experiments/pom.xml | 1 +
asterixdb/asterix-external-data/pom.xml | 10 +-
.../stream/TwitterFirehoseInputStream.java | 158 ---
.../factory/TwitterFirehoseStreamFactory.java | 102 --
.../provider/DatasourceFactoryProvider.java | 6 -
.../asterix/external/util/DataGenerator.java | 1189 ----------------
.../external/util/ExternalDataConstants.java | 1 -
.../asterix/external/util/TweetGenerator.java | 154 --
.../external/generator/DataGenerator.java | 1192 ++++++++++++++++
.../external/generator/TweetGenerator.java | 154 ++
.../stream/TwitterFirehoseInputStream.java | 158 +++
.../stream/TwitterFirehoseStreamFactory.java | 102 ++
asterixdb/asterix-tools/pom.xml | 18 +-
.../asterix/tools/datagen/AdgClientDriver.java | 52 -
.../asterix/tools/datagen/AdmDataGen.java | 1020 -------------
.../asterix/tools/datagen/CustOrdDataGen.java | 476 -------
.../asterix/tools/datagen/EventDataGen.java | 237 ----
.../DataGeneratorForSpatialIndexEvaluation.java | 1341 ------------------
.../tools/external/data/GULongIDGenerator.java | 50 -
...TweetGeneratorForSpatialIndexEvaluation.java | 139 --
.../apache/asterix/tools/tbltoadm/TblToAdm.java | 97 --
.../tools/translator/ADGenDmlTranslator.java | 91 --
.../asterix/tools/datagen/AdgClientDriver.java | 52 +
.../asterix/tools/datagen/AdmDataGen.java | 1020 +++++++++++++
.../asterix/tools/datagen/CustOrdDataGen.java | 476 +++++++
.../asterix/tools/datagen/EventDataGen.java | 237 ++++
.../DataGeneratorForSpatialIndexEvaluation.java | 1341 ++++++++++++++++++
.../tools/external/data/GULongIDGenerator.java | 50 +
...TweetGeneratorForSpatialIndexEvaluation.java | 139 ++
.../apache/asterix/tools/tbltoadm/TblToAdm.java | 97 ++
.../tools/translator/ADGenDmlTranslator.java | 91 ++
37 files changed, 5139 insertions(+), 5134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
index 70ea8d6..f3f8f7d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
@@ -54,12 +54,12 @@ create dataset SyntheticTweets(TweetMessageType)
primary key id;
create feed SyntheticTweetFeed
-using twitter_firehose(
+using stream(
("duration"="5"),
("tps"="50"),
("type-name"="TweetMessageType"),
("format"="adm"),
-("reader-stream"="twitter_firehose"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
("tput-duration"="5"),
("dataverse-dataset"="feeds:SyntheticTweets"),
("mode"="controlled"));
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
index 658487b..c339563 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
@@ -58,8 +58,9 @@ primary key id;
create index ngram_index on SyntheticTweets(message_text) type ngram(3);
create feed SyntheticTweetFeed
-using twitter_firehose
+using stream
(("duration"="5"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
("tps"="50"),
("type-name"="TweetMessageType"),
("tput-duration"="5"),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
index 6714850..59385c4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
@@ -56,12 +56,12 @@ primary key id;
create index message_text on SyntheticTweets(message_text) type btree;
create feed SyntheticTweetFeed
-using twitter_firehose
+using stream
(("duration"="5"),
("tps"="50"),
("tput-duration"="5"),
("type-name"="TweetMessageType"),
("dataverse-dataset"="feeds:SyntheticTweets"),
("format"="adm"),
-("reader-stream"="twitter_firehose"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
("mode"="controlled"));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
index e3f3ae5..1f24192 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
@@ -45,12 +45,12 @@ create type feeds.TweetMessageType as
create dataset SyntheticTweets(TweetMessageType) primary key id;
-create primary feed SyntheticTweetFeed using twitter_firehose(
+create primary feed SyntheticTweetFeed using stream(
(`duration`=`5`),
(`tps`=`50`),
(`type-name`=`TweetMessageType`),
(`format`=`adm`),
-(`reader-stream`=`twitter_firehose`),
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),
(`tput-duration`=`5`),
(`dataverse-dataset`=`feeds:SyntheticTweets`),
(`mode`=`controlled`));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
index a98b745..6311b8b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
@@ -45,9 +45,10 @@ create type feeds.TweetMessageType as
create dataset SyntheticTweets(TweetMessageType) primary key id;
create index ngram_index on SyntheticTweets (message_text) type ngram (3);
-create primary feed SyntheticTweetFeed using twitter_firehose (
+create primary feed SyntheticTweetFeed using stream (
(`duration`=`5`),
(`tps`=`50`),
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),
(`type-name`=`TweetMessageType`),
(`tput-duration`=`5`),
(`dataverse-dataset`=`feeds:SyntheticTweets`),
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
index 1b1c780..71a0ca2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
@@ -45,7 +45,7 @@ create type feeds_09.TweetMessageType as
}
create dataset SyntheticTweets(TweetMessageType) primary key id;
create index message_text on SyntheticTweets (message_text) type btree;
-create primary feed SyntheticTweetFeed using twitter_firehose ((`duration`=`5`),
+create primary feed SyntheticTweetFeed using stream ((`duration`=`5`),
(`tps`=`50`),(`tput-duration`=`5`),(`type-name`=`TweetMessageType`),
(`dataverse-dataset`=`feeds:SyntheticTweets`),(`format`=`adm`),
-(`reader-stream`=`twitter_firehose`),(`mode`=`controlled`));
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),(`mode`=`controlled`));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-experiments/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/pom.xml b/asterixdb/asterix-experiments/pom.xml
index b923fba..2718565 100644
--- a/asterixdb/asterix-experiments/pom.xml
+++ b/asterixdb/asterix-experiments/pom.xml
@@ -153,6 +153,7 @@
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-tools</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
</dependency>
<dependency>
<groupId>com.hierynomus</groupId>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 245f340..0cc6826 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -202,8 +202,6 @@
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-runtime</artifactId>
<version>${project.version}</version>
- <type>jar</type>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
@@ -242,8 +240,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
- <type>jar</type>
- <scope>compile</scope>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
@@ -269,8 +266,6 @@
<groupId>net.java.dev.rome</groupId>
<artifactId>rome-fetcher</artifactId>
<version>1.0.0</version>
- <type>jar</type>
- <scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>rome</artifactId>
@@ -287,11 +282,13 @@
<groupId>jdom</groupId>
<artifactId>jdom</artifactId>
<version>1.0</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.windowsazure</groupId>
<artifactId>microsoft-windowsazure-api</artifactId>
<version>0.4.4</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
@@ -302,6 +299,7 @@
<groupId>javax.jdo</groupId>
<artifactId>jdo2-api</artifactId>
<version>2.3-20090302111651</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.e-movimento.tinytools</groupId>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
deleted file mode 100644
index e2afd7b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class TwitterFirehoseInputStream extends AsterixInputStream {
-
- private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStream.class.getName());
- private final ExecutorService executorService;
- private final PipedOutputStream outputStream;
- private final PipedInputStream inputStream;
- private final DataProvider dataProvider;
- private boolean started;
-
- public TwitterFirehoseInputStream(Map<String, String> configuration, IHyracksTaskContext ctx, int partition)
- throws IOException {
- executorService = Executors.newCachedThreadPool();
- outputStream = new PipedOutputStream();
- inputStream = new PipedInputStream(outputStream);
- dataProvider = new DataProvider(configuration, partition, outputStream);
- started = false;
- }
-
- @Override
- public boolean stop() throws IOException {
- dataProvider.stop();
- return true;
- }
-
- public synchronized void start() {
- if (!started) {
- executorService.execute(dataProvider);
- started = true;
- }
- }
-
- @Override
- public int read() throws IOException {
- if (!started) {
- start();
- }
- return inputStream.read();
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- if (!started) {
- start();
- }
- return inputStream.read(b, off, len);
- }
-
- @Override
- public boolean handleException(Throwable th) {
- return false;
- }
-
- private static class DataProvider implements Runnable {
-
- public static final String KEY_MODE = "mode";
-
- private final TweetGenerator tweetGenerator;
- private boolean continuePush = true;
- private int batchSize;
- private final Mode mode;
- private final OutputStream os;
-
- public static enum Mode {
- AGGRESSIVE,
- CONTROLLED
- }
-
- public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
- this.tweetGenerator = new TweetGenerator(configuration, partition);
- this.tweetGenerator.registerSubscriber(os);
- this.os = os;
- mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
- : Mode.AGGRESSIVE;
- switch (mode) {
- case CONTROLLED:
- String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
- if (tpsValue == null) {
- throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
- }
- batchSize = Integer.parseInt(tpsValue);
- break;
- case AGGRESSIVE:
- batchSize = 5000;
- break;
- }
- }
-
- @Override
- public void run() {
- boolean moreData = true;
- long startBatch;
- long endBatch;
- while (true) {
- try {
- while (moreData && continuePush) {
- switch (mode) {
- case AGGRESSIVE:
- moreData = tweetGenerator.generateNextBatch(batchSize);
- break;
- case CONTROLLED:
- startBatch = System.currentTimeMillis();
- moreData = tweetGenerator.generateNextBatch(batchSize);
- endBatch = System.currentTimeMillis();
- if ((endBatch - startBatch) < 1000) {
- Thread.sleep(1000 - (endBatch - startBatch));
- }
- break;
- }
- }
- os.close();
- break;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in adapter " + e.getMessage());
- }
- }
- }
- }
-
- public void stop() {
- continuePush = false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
deleted file mode 100644
index abe67fd..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IInputStreamFactory;
-import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
- * simulates a twitter firehose with tweets being "pushed" into Asterix at a
- * configurable rate measured in terms of TPS (tweets/second). The stream of
- * tweets lasts for a configurable duration (measured in seconds).
- */
-public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Degree of parallelism for feed ingestion activity. Defaults to 1. This
- * determines the count constraint for the ingestion operator.
- **/
- private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
-
- /**
- * The absolute locations where ingestion operator instances will be placed.
- **/
- private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
-
- private Map<String, String> configuration;
-
- @Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
- String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
- String[] locations = null;
- if (ingestionLocationParam != null) {
- locations = ingestionLocationParam.split(",");
- }
- int count = locations != null ? locations.length : 1;
- if (ingestionCardinalityParam != null) {
- count = Integer.parseInt(ingestionCardinalityParam);
- }
-
- List<String> chosenLocations = new ArrayList<String>();
- String[] availableLocations = locations != null ? locations
- : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
- for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
- chosenLocations.add(availableLocations[k]);
- }
- return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
- }
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.STREAM;
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
- @Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- try {
- return new TwitterFirehoseInputStream(configuration, ctx, partition);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index ad11171..7ab6430 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -33,7 +33,6 @@ import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordRead
import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
-import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -73,9 +72,6 @@ public class DatasourceFactoryProvider {
case ExternalDataConstants.STREAM_SOCKET_CLIENT:
streamSourceFactory = new SocketServerInputStreamFactory();
break;
- case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
- streamSourceFactory = new TwitterFirehoseStreamFactory();
- break;
default:
try {
streamSourceFactory = (IInputStreamFactory) Class.forName(streamSource).newInstance();
@@ -102,8 +98,6 @@ public class DatasourceFactoryProvider {
case ExternalDataConstants.READER_PUSH_TWITTER:
case ExternalDataConstants.READER_PULL_TWITTER:
return new TwitterRecordReaderFactory();
- case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
- return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
case ExternalDataConstants.SOCKET:
return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());