You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by xi...@apache.org on 2017/03/25 00:55:44 UTC

[3/4] asterixdb git commit: Expose TwitterFirehose Stream to Users

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
new file mode 100644
index 0000000..9431a24
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/generator/TweetGenerator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.generator;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.generator.DataGenerator.TweetMessage;
+import org.apache.asterix.external.generator.DataGenerator.TweetMessageIterator;
+
+public class TweetGenerator {
+    private static final Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+    public static final String KEY_DURATION = "duration";
+    public static final String KEY_TPS = "tps";
+    public static final String KEY_VERBOSE = "verbose";
+    public static final String KEY_FIELDS = "fields";
+    public static final int INFINITY = 0;
+
+    private static final int DEFAULT_DURATION = INFINITY;
+
+    private final int duration;
+    private TweetMessageIterator tweetIterator = null;
+    private final int partition;
+    private long tweetCount = 0;
+    private int frameTweetCount = 0;
+    private int numFlushedTweets = 0;
+    private DataGenerator dataGenerator = null;
+    private final ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+    private final String[] fields;
+    private final List<OutputStream> subscribers;
+    private final Object lock = new Object();
+    private final List<OutputStream> subscribersForRemoval = new ArrayList<OutputStream>();
+
+    public TweetGenerator(Map<String, String> configuration, int partition) {
+        this.partition = partition;
+        String value = configuration.get(KEY_DURATION);
+        this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+        dataGenerator = new DataGenerator();
+        tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+        this.fields = configuration.get(KEY_FIELDS) != null ? configuration.get(KEY_FIELDS).split(",") : null;
+        this.subscribers = new ArrayList<OutputStream>();
+    }
+
+    private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+        String tweet = tweetMessage.getAdmEquivalent(fields) + "\n";
+        tweetCount++;
+        byte[] b = tweet.getBytes();
+        if ((outputBuffer.position() + b.length) > outputBuffer.limit()) {
+            flush();
+            numFlushedTweets += frameTweetCount;
+            frameTweetCount = 0;
+            outputBuffer.put(b);
+        } else {
+            outputBuffer.put(b);
+        }
+        frameTweetCount++;
+    }
+
+    private void flush() throws IOException {
+        outputBuffer.flip();
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                try {
+                    os.write(outputBuffer.array(), 0, outputBuffer.limit());
+                } catch (Exception e) {
+                    LOGGER.info("OutputStream failed. Add it into the removal list.");
+                    subscribersForRemoval.add(os);
+                }
+            }
+            if (!subscribersForRemoval.isEmpty()) {
+                subscribers.removeAll(subscribersForRemoval);
+                subscribersForRemoval.clear();
+            }
+        }
+        outputBuffer.position(0);
+        outputBuffer.limit(32 * 1024);
+    }
+
+    public boolean generateNextBatch(int numTweets) throws IOException{
+        boolean moreData = tweetIterator.hasNext();
+        if (!moreData) {
+            if (outputBuffer.position() > 0) {
+                flush();
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+            }
+            return false;
+        } else {
+            int count = 0;
+            while (count < numTweets) {
+                writeTweetString(tweetIterator.next());
+                count++;
+            }
+            return true;
+        }
+    }
+
+    public int getNumFlushedTweets() {
+        return numFlushedTweets;
+    }
+
+    public void registerSubscriber(OutputStream os) {
+        synchronized (lock) {
+            subscribers.add(os);
+        }
+    }
+
+    public void deregisterSubscribers(OutputStream os) {
+        synchronized (lock) {
+            subscribers.remove(os);
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (lock) {
+            for (OutputStream os : subscribers) {
+                os.close();
+            }
+        }
+    }
+
+    public boolean isSubscribed() {
+        return !subscribers.isEmpty();
+    }
+
+    public long getTweetCount() {
+        return tweetCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
new file mode 100644
index 0000000..3928b19
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.generator.TweetGenerator;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFirehoseInputStream extends AsterixInputStream {
+
+    private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStream.class.getName());
+    private final ExecutorService executorService;
+    private final PipedOutputStream outputStream;
+    private final PipedInputStream inputStream;
+    private final DataProvider dataProvider;
+    private boolean started;
+
+    public TwitterFirehoseInputStream(Map<String, String> configuration, int partition)
+            throws IOException {
+        executorService = Executors.newCachedThreadPool();
+        outputStream = new PipedOutputStream();
+        inputStream = new PipedInputStream(outputStream);
+        dataProvider = new DataProvider(configuration, partition, outputStream);
+        started = false;
+    }
+
+    @Override
+    public boolean stop() throws IOException {
+        dataProvider.stop();
+        return true;
+    }
+
+    public synchronized void start() {
+        if (!started) {
+            executorService.execute(dataProvider);
+            started = true;
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!started) {
+            start();
+        }
+        return inputStream.read();
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if (!started) {
+            start();
+        }
+        return inputStream.read(b, off, len);
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    private static class DataProvider implements Runnable {
+
+        public static final String KEY_MODE = "mode";
+
+        private final TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
+        private int batchSize;
+        private final Mode mode;
+        private final OutputStream os;
+
+        public enum Mode {
+            AGGRESSIVE,
+            CONTROLLED
+        }
+
+        public DataProvider(Map<String, String> configuration, int partition, OutputStream os) {
+            this.tweetGenerator = new TweetGenerator(configuration, partition);
+            this.tweetGenerator.registerSubscriber(os);
+            this.os = os;
+            mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+                    : Mode.AGGRESSIVE;
+            switch (mode) {
+                case CONTROLLED:
+                    String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+                    if (tpsValue == null) {
+                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+                    }
+                    batchSize = Integer.parseInt(tpsValue);
+                    break;
+                case AGGRESSIVE:
+                    batchSize = 5000;
+                    break;
+            }
+        }
+
+        @Override
+        public void run() {
+            boolean moreData = true;
+            long startBatch;
+            long endBatch;
+            while (true) {
+                try {
+                    while (moreData && continuePush) {
+                        switch (mode) {
+                            case AGGRESSIVE:
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                break;
+                            case CONTROLLED:
+                                startBatch = System.currentTimeMillis();
+                                moreData = tweetGenerator.generateNextBatch(batchSize);
+                                endBatch = System.currentTimeMillis();
+                                if ((endBatch - startBatch) < 1000) {
+                                    Thread.sleep(1000 - (endBatch - startBatch));
+                                }
+                                break;
+                        }
+                    }
+                    os.close();
+                    break;
+                } catch (Exception e) {
+                    LOGGER.warning("Exception in adapter " + e.getMessage());
+                }
+            }
+        }
+
+        public void stop() {
+            continuePush = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f75e51c2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
new file mode 100644
index 0000000..936f1f8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.factory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
+ * simulates a twitter firehose with tweets being "pushed" into Asterix at a
+ * configurable rate measured in terms of TPS (tweets/second). The stream of
+ * tweets lasts for a configurable duration (measured in seconds).
+ */
+public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
+     * determines the count constraint for the ingestion operator.
+     **/
+    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
+
+    /**
+     * The absolute locations where ingestion operator instances will be placed.
+     **/
+    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
+    private Map<String, String> configuration;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY);
+        String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS);
+        String[] locations = null;
+        if (ingestionLocationParam != null) {
+            locations = ingestionLocationParam.split(",");
+        }
+        int count = locations != null ? locations.length : 1;
+        if (ingestionCardinalityParam != null) {
+            count = Integer.parseInt(ingestionCardinalityParam);
+        }
+
+        List<String> chosenLocations = new ArrayList<>();
+        String[] availableLocations = locations != null ? locations
+                : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
+        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+            chosenLocations.add(availableLocations[k]);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
+    }
+
+    @Override
+    public DataSourceType getDataSourceType() {
+        return DataSourceType.STREAM;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public boolean isIndexible() {
+        return false;
+    }
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        try {
+            return new TwitterFirehoseInputStream(configuration, partition);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}