You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by xi...@apache.org on 2017/03/25 00:55:44 UTC
[3/4] asterixdb git commit: Expose TwitterFirehose Stream to Users
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
new file mode 100644
index 0000000..9431a24
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.generator;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.generator.DataGenerator.TweetMessage;
+import org.apache.asterix.external.generator.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+ private static final Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+ public static final String KEY_DURATION = "duration";
+ public static final String KEY_TPS = "tps";
+ public static final String KEY_VERBOSE = "verbose";
+ public static final String KEY_FIELDS = "fields";
+ public static final int INFINITY = 0;
+
+ private static final int DEFAULT_DURATION = INFINITY;
+
+ private final int duration;
+ private TweetMessageIterator tweetIterator = null;
+ private final int partition;
+ private long tweetCount = 0;
+ private int frameTweetCount = 0;
+ private int numFlushedTweets = 0;
+ private DataGenerator dataGenerator = null;
+ private final ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+ private final String[] fields;
+ private final List<OutputStream> subscribers;
+ private final Object lock = new Object();
+ private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+ public TweetGenerator(Map<String, String> configuration, int partition) {
+ this.partition = partition;
+ String value = configuration.get(KEY_DURATION);
+ this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+ dataGenerator = new DataGenerator();
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+ this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+ this.subscribers = new ArrayList<OutputStream>();
+ }
+
+ private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+ String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+ tweetCount++;
+ byte[] b = tweet.getBytes();
+ if ((outputBuffer.position() + b.length) > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ outputBuffer.put(b);
+ } else {
+ outputBuffer.put(b);
+ }
+ frameTweetCount++;
+ }
+
+ private void flush() throws IOException {
+ outputBuffer.flip();
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ try {
+ os.write(outputBuffer.array(), 0, outputBuffer.limit());
+ } catch (Exception e) {
+ LOGGER.info("OutputStream failed. Add it into the removal list.");
+ subscribersForRemoval.add(os);
+ }
+ }
+ if (!subscribersForRemoval.isEmpty()) {
+ subscribers.removeAll(subscribersForRemoval);
+ subscribersForRemoval.clear();
+ }
+ }
+ outputBuffer.position(0);
+ outputBuffer.limit(32 * 1024);
+ }
+
+ public boolean generateNextBatch(int numTweets) throws IOException{
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ if (outputBuffer.position() > 0) {
+ flush();
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+ }
+ return false;
+ } else {
+ int count = 0;
+ while (count < numTweets) {
+ writeTweetString(tweetIterator.next());
+ count++;
+ }
+ return true;
+ }
+ }
+
+ public int getNumFlushedTweets() {
+ return numFlushedTweets;
+ }
+
+ public void registerSubscriber(OutputStream os) {
+ synchronized (lock) {
+ subscribers.add(os);
+ }
+ }
+
+ public void deregisterSubscribers(OutputStream os) {
+ synchronized (lock) {
+ subscribers.remove(os);
+ }
+ }
+
+ public void close() throws IOException {
+ synchronized (lock) {
+ for (OutputStream os : subscribers) {
+ os.close();
+ }
+ }
+ }
+
+ public boolean isSubscribed() {
+ return !subscribers.isEmpty();
+ }
+
+ public long getTweetCount() {
+ return tweetCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
new file mode 100644
index 0000000..3928b19
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.generator.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStream extends AsterixInputStream {
+
+ private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStream.class.getName());
+ private final ExecutorService executorService;
+ private final PipedOutputStream outputStream;
+ private final PipedInputStream inputStream;
+ private final DataProvider dataProvider;
+ private boolean started;
+
+ public TwitterFirehoseInputStream(Map<String, String> configuration, int partition)
+ throws IOException {
+ executorService = Executors.newCachedThreadPool();
+ outputStream = new PipedOutputStream();
+ inputStream = new PipedInputStream(outputStream);
+ dataProvider = new DataProvider(configuration, partition, outputStream);
+ started = false;
+ }
+
+ @Override
+ public boolean stop() throws IOException {
+ dataProvider.stop();
+ return true;
+ }
+
+ public synchronized void start() {
+ if (!started) {
+ executorService.execute(dataProvider);
+ started = true;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!started) {
+ start();
+ }
+ return inputStream.read();
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (!started) {
+ start();
+ }
+ return inputStream.read(b, off, len);
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ private static class DataProvider implements Runnable {
+
+ public static final String KEY_MODE = "mode";
+
+ private final TweetGenerator tweetGenerator;
+ private boolean continuePush = true;
+ private int batchSize;
+ private final Mode mode;
+ private final OutputStream os;
+
+ public enum Mode {
+ AGGRESSIVE,
+ CONTROLLED
+ }
+
+ public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
+ this.tweetGenerator = new TweetGenerator(configuration, partition);
+ this.tweetGenerator.registerSubscriber(os);
+ this.os = os;
+ mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+ : Mode.AGGRESSIVE;
+ switch (mode) {
+ case CONTROLLED:
+ String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+ if (tpsValue == null) {
+ throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+ }
+ batchSize = Integer.parseInt(tpsValue);
+ break;
+ case AGGRESSIVE:
+ batchSize = 5000;
+ break;
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean moreData = true;
+ long startBatch;
+ long endBatch;
+ while (true) {
+ try {
+ while (moreData && continuePush) {
+ switch (mode) {
+ case AGGRESSIVE:
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ break;
+ case CONTROLLED:
+ startBatch = System.currentTimeMillis();
+ moreData = tweetGenerator.generateNextBatch(batchSize);
+ endBatch = System.currentTimeMillis();
+ if ((endBatch - startBatch) < 1000) {
+ Thread.sleep(1000 - (endBatch - startBatch));
+ }
+ break;
+ }
+ }
+ os.close();
+ break;
+ } catch (Exception e) {
+ LOGGER.warning("Exception in adapter " + e.getMessage());
+ }
+ }
+ }
+
+ public void stop() {
+ continuePush = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
new file mode 100644
index 0000000..936f1f8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+ * determines the count constraint for the ingestion operator.
+ **/
+ private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+ /**
+ * The absolute locations where ingestion operator instances will be placed.
+ **/
+ private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+ private Map<String, String> configuration;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+ String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+ String[] locations = null;
+ if (ingestionLocationParam != null) {
+ locations = ingestionLocationParam.split(",");
+ }
+ int count = locations != null ? locations.length : 1;
+ if (ingestionCardinalityParam != null) {
+ count = Integer.parseInt(ingestionCardinalityParam);
+ }
+
+ List<String> chosenLocations = new ArrayList<>();
+ String[] availableLocations = locations != null ? locations
+ : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
+ for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+ chosenLocations.add(availableLocations[k]);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ try {
+ return new TwitterFirehoseInputStream(configuration, partition);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}