You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:20 UTC
[4/6] apex-malhar git commit: Added Beam Examples and Implementations
of Accumulation.
Added Beam Examples and Implementations of Accumulation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dcca7752
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dcca7752
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dcca7752
Branch: refs/heads/master
Commit: dcca7752a8ee966d67602a1b7cb8fbacdb8ed59d
Parents: 266b041
Author: Shunxin <lu...@hotmail.com>
Authored: Wed Aug 24 13:12:20 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Aug 25 09:26:03 2016 -0700
----------------------------------------------------------------------
demos/highlevelapi/pom.xml | 98 +++-
.../malhar/stream/sample/MinimalWordCount.java | 126 +++++
.../malhar/stream/sample/WindowedWordCount.java | 277 ++++++++++
.../stream/sample/complete/AutoComplete.java | 308 +++++++++++
.../sample/complete/CompletionCandidate.java | 87 ++++
.../stream/sample/complete/PojoEvent.java | 44 ++
.../sample/complete/StreamingWordExtract.java | 160 ++++++
.../stream/sample/complete/TopNByKey.java | 118 +++++
.../sample/complete/TopWikipediaSessions.java | 340 ++++++++++++
.../stream/sample/complete/TrafficRoutes.java | 521 +++++++++++++++++++
.../sample/complete/TwitterAutoComplete.java | 251 +++++++++
.../sample/cookbook/CombinePerKeyExamples.java | 212 ++++----
.../stream/sample/cookbook/DeDupExample.java | 124 +++++
.../stream/sample/cookbook/InputPojo.java | 76 +++
.../sample/cookbook/MaxPerKeyExamples.java | 203 ++++++++
.../stream/sample/cookbook/OutputPojo.java | 54 ++
.../stream/sample/cookbook/TriggerExample.java | 137 +++--
.../src/main/resources/META-INF/properties.xml | 141 +++++
.../stream/sample/MinimalWordCountTest.java | 61 +++
.../stream/sample/WindowedWordCountTest.java | 90 ++++
.../sample/complete/AutoCompleteTest.java | 66 +++
.../complete/StreamingWordExtractTest.java | 144 +++++
.../complete/TopWikipediaSessionsTest.java | 73 +++
.../sample/complete/TrafficRoutesTest.java | 66 +++
.../complete/TwitterAutoCompleteTest.java | 66 +++
.../cookbook/CombinePerKeyExamplesTest.java | 56 ++
.../sample/cookbook/DeDupExampleTest.java | 59 +++
.../sample/cookbook/MaxPerKeyExamplesTest.java | 210 ++++++++
.../src/test/resources/data/word.txt | 2 +
.../src/test/resources/log4j.properties | 45 ++
.../src/test/resources/sampletweets.txt | 207 ++++++++
.../src/test/resources/wordcount/word.txt | 8 +
demos/pom.xml | 13 +-
.../lib/window/impl/accumulation/Average.java | 64 +++
.../lib/window/impl/accumulation/Count.java | 61 +++
.../lib/window/impl/accumulation/FoldFn.java | 65 +++
.../lib/window/impl/accumulation/Group.java | 63 +++
.../lib/window/impl/accumulation/Max.java | 75 +++
.../lib/window/impl/accumulation/Min.java | 76 +++
.../lib/window/impl/accumulation/ReduceFn.java | 65 +++
.../impl/accumulation/RemoveDuplicates.java | 72 +++
.../lib/window/impl/accumulation/SumDouble.java | 60 +++
.../lib/window/impl/accumulation/SumFloat.java | 60 +++
.../lib/window/impl/accumulation/SumInt.java | 60 +++
.../lib/window/impl/accumulation/SumLong.java | 60 +++
.../lib/window/impl/accumulation/TopN.java | 106 ++++
.../lib/window/impl/accumulation/TopNByKey.java | 114 ++++
.../window/impl/accumulation/AverageTest.java | 41 ++
.../window/impl/accumulation/FoldFnTest.java | 129 +++++
.../lib/window/impl/accumulation/GroupTest.java | 42 ++
.../lib/window/impl/accumulation/MaxTest.java | 53 ++
.../lib/window/impl/accumulation/MinTest.java | 53 ++
.../window/impl/accumulation/ReduceFnTest.java | 50 ++
.../impl/accumulation/RemoveDuplicatesTest.java | 42 ++
.../lib/window/impl/accumulation/SumTest.java | 57 ++
.../window/impl/accumulation/TopNByKeyTest.java | 75 +++
.../apex/malhar/stream/api/WindowedStream.java | 4 +-
.../stream/api/impl/ApexWindowedStreamImpl.java | 11 +-
.../stream/api/impl/accumulation/Count.java | 61 ---
.../stream/api/impl/accumulation/FoldFn.java | 65 ---
.../stream/api/impl/accumulation/ReduceFn.java | 65 ---
.../stream/api/impl/accumulation/TopN.java | 107 ----
stream/src/test/resources/words/word.txt | 2 +
63 files changed, 5820 insertions(+), 481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml
index c669681..cde0c83 100644
--- a/demos/highlevelapi/pom.xml
+++ b/demos/highlevelapi/pom.xml
@@ -34,21 +34,107 @@
<version>3.5.0-SNAPSHOT</version>
</parent>
- <properties>
- <skipTests>true</skipTests>
- </properties>
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.9.1</version>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>target/${project.artifactId}-${project.version}.apa</file>
+ <type>apa</type>
+ </artifact>
+ </artifacts>
+ <skipAttach>false</skipAttach>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
<dependencies>
<dependency>
- <groupId>cglib</groupId>
- <artifactId>cglib</artifactId>
- <version>3.2.1</version>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <!-- required by twitter demo -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>4.0.4</version>
+ </dependency>
+ <dependency>
+ <!-- required by twitter demo -->
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>4.0.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-stream</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.192</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.1</version>
</dependency>
+ <dependency>
+ <!--This dependency is needed for StreamingWordExtractTest-->
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>commons-compiler</artifactId>
+ <version>2.7.8</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <!--This dependency is needed for StreamingWordExtractTest-->
+ <groupId>org.codehaus.janino</groupId>
+ <artifactId>janino</artifactId>
+ <version>2.7.8</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
new file mode 100644
index 0000000..671cc72
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam MinimalWordCount Example
+ */
+@ApplicationAnnotation(name = "MinimalWordCount")
+public class MinimalWordCount implements StreamingApplication
+{
+ public static class Collector extends BaseOperator
+ {
+ static Map<String, Long> result;
+ private static boolean done = false;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ result = new HashMap<>();
+ }
+
+ public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
+ {
+ @Override
+ public void process(KeyValPair<String, Long> tuple)
+ {
+ if (tuple.getKey().equals("bye")) {
+ done = true;
+ }
+ result.put(tuple.getKey(), tuple.getValue());
+ }
+ };
+ }
+
+ /**
+ * Populate the dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Collector collector = new Collector();
+ // Create a stream reading from a file line by line using StreamFactory.
+ StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+ // Use a flatmap transformation to extract words from the incoming stream of lines.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[^a-zA-Z']+"));
+
+ }
+ }, name("ExtractWords"))
+ // Apply windowing to the stream for counting, in this case, the window option is global window.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ // Count the appearances of every word.
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
+ }
+ }, name("countByKey"))
+ // Format the counting result to a readable format by unwrapping the tuples.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
+ {
+ @Override
+ public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return input.getValue();
+ }
+ }, name("FormatResults"))
+ // Print the result.
+ .print()
+ // Attach a collector to the stream to collect results.
+ .endWith(collector, collector.input, name("Collector"))
+ // populate the dag using the stream.
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
new file mode 100644
index 0000000..6a6777e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam WindowedWordCount Example.
+ */
+@ApplicationAnnotation(name = "WindowedWordCount")
+public class WindowedWordCount implements StreamingApplication
+{
+ static final int WINDOW_SIZE = 1; // Default window duration in minutes
+
+ /**
+ * A input operator that reads from and output a file line by line to downstream with a time gap between
+ * every two lines.
+ */
+ public static class TextInput extends BaseOperator implements InputOperator
+ {
+ private static boolean done = false;
+
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private transient BufferedReader reader;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ Thread.sleep(1000);
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+
+ public static Map<KeyValPair<Long, String>, Long> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
+ {
+ @Override
+ public void process(PojoEvent tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
+ }
+ };
+ }
+
+ /**
+ * A Pojo Tuple class used for outputting result to JDBC.
+ */
+ public static class PojoEvent
+ {
+ private String word;
+ private long count;
+ private long timestamp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public long getCount()
+ {
+ return count;
+ }
+
+ public void setCount(long count)
+ {
+ this.count = count;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+ }
+
+ /**
+ * A map function that wrap the input string with a random generated timestamp.
+ */
+ public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private final Long minTimestamp;
+
+ AddTimestampFn()
+ {
+ this.minTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ // Generate a timestamp that falls somewhere in the past two hours.
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+
+ return new Tuple.TimestampedTuple<>(randomTimestamp, input);
+ }
+ }
+
+ /** A MapFunction that converts a Word and Count into a PojoEvent. */
+ public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
+ {
+ @Override
+ public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ PojoEvent row = new PojoEvent();
+ row.setTimestamp(input.getTimestamp());
+ row.setCount(input.getValue().getValue());
+ row.setWord(input.getValue().getKey());
+ return row;
+ }
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TextInput input = new TextInput();
+ Collector collector = new Collector();
+
+ // Create stream from the TextInput operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
+
+ // Extract all the words from the input line of text.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ }, name("ExtractWords"))
+
+ // Wrap the word with a randomly generated timestamp.
+ .map(new AddTimestampFn(), name("AddTimestampFn"));
+
+
+ // apply window and trigger option.
+ // TODO: change trigger option to atWaterMark when available.
+ WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
+ new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
+
+
+ WindowedStream<PojoEvent> wordCounts =
+ // Perform a countByKey transformation to count the appearance of each word in every time window.
+ windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
+ new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("count words"))
+
+ // Format the output and print out the result.
+ .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print();
+
+ wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
new file mode 100644
index 0000000..29c8cf9
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ * This application is identical to TwitterAutoComplete, except it's
+ * reading from a file. This application is mainly for local testing
+ * purpose.
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ */
+@ApplicationAnnotation(name = "AutoComplete")
+public class AutoComplete implements StreamingApplication
+{
+
+ /**
+ * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
+ * half of a second.
+ */
+ public static class TweetsInput extends BaseOperator implements InputOperator
+ {
+ private static boolean done = false;
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private transient BufferedReader reader;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ Thread.sleep(1000);
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+
+ public static Map<String, List<CompletionCandidate>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
+ {
+ result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+ }
+ };
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
+ .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
+ CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+ }
+ });
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("ToCompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
+
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TweetsInput input = new TweetsInput();
+ Collector collector = new Collector();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
+ .flatMap(new ExtractHashtags());
+
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
new file mode 100644
index 0000000..8a7113e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Class used to store tag-count pairs in Auto Complete Demo.
+ */
+public class CompletionCandidate implements Comparable<CompletionCandidate>
+{
+ private long count;
+ private String value;
+
+ public CompletionCandidate(String value, long count)
+ {
+ this.value = value;
+ this.count = count;
+ }
+
+ public long getCount()
+ {
+ return count;
+ }
+
+ public String getValue()
+ {
+ return value;
+ }
+
+ // Empty constructor required for Kryo.
+ public CompletionCandidate()
+ {
+
+ }
+
+ @Override
+ public int compareTo(CompletionCandidate o)
+ {
+ if (this.count < o.count) {
+ return -1;
+ } else if (this.count == o.count) {
+ return this.value.compareTo(o.value);
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (other instanceof CompletionCandidate) {
+ CompletionCandidate that = (CompletionCandidate)other;
+ return this.count == that.count && this.value.equals(that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.valueOf(count).hashCode() ^ value.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompletionCandidate[" + value + ", " + count + "]";
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
new file mode 100644
index 0000000..2a4c003
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Tuple Class for JdbcOutput of StreamingWordExtract.
+ */
+public class PojoEvent extends Object
+{
+ private String stringValue;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [stringValue=" + getStringValue() + "]";
+ }
+
+ public void setStringValue(String newString)
+ {
+ this.stringValue = newString;
+ }
+
+ public String getStringValue()
+ {
+ return this.stringValue;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..2ffdc82
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static java.sql.Types.VARCHAR;
+
+/**
+ * Beam StreamingWordExtract Example.
+ */
+@ApplicationAnnotation(name = "StreamingWordExtract")
+public class StreamingWordExtract implements StreamingApplication
+{
+ private static int wordCount = 0; // A counter to count number of words have been extracted.
+ private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
+
+ public int getWordCount()
+ {
+ return wordCount;
+ }
+
+ public int getEntriesMapped()
+ {
+ return entriesMapped;
+ }
+
+ /**
+ * A MapFunction that tokenizes lines of text into individual words.
+ */
+ public static class ExtractWords implements Function.FlatMapFunction<String, String>
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
+ wordCount += result.size();
+ return result;
+ }
+ }
+
+
+ /**
+ * A MapFunction that uppercases a word.
+ */
+ public static class Uppercase implements Function.MapFunction<String, String>
+ {
+ @Override
+ public String f(String input)
+ {
+ return input.toUpperCase();
+ }
+ }
+
+
+ /**
+ * A filter function to filter out empty strings.
+ */
+ public static class EmptyStringFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return !input.isEmpty();
+ }
+ }
+
+
+ /**
+ * A map function to map the result string to a pojo entry.
+ */
+ public static class PojoMapper implements Function.MapFunction<String, Object>
+ {
+
+ @Override
+ public Object f(String input)
+ {
+ PojoEvent pojo = new PojoEvent();
+ pojo.setStringValue(input);
+ entriesMapped++;
+ return pojo;
+ }
+ }
+
+ /**
+ * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
+ */
+ private static List<JdbcFieldInfo> addFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
+ return fieldInfos;
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+ jdbcOutput.setFieldInfos(addFieldInfos());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutput.setStore(outputStore);
+ jdbcOutput.setTablename("TestTable");
+
+ // Create a stream reading from a folder.
+ ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
+
+ // Extract all the words from the input line of text.
+ stream.flatMap(new ExtractWords())
+
+ // Filter out the empty strings.
+ .filter(new EmptyStringFilter())
+
+ // Change every word to uppercase.
+ .map(new Uppercase())
+
+ // Map the resulted word to a Pojo entry.
+ .map(new PojoMapper())
+
+ // Output the entries to JdbcOutput and insert them into a table.
+ .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
+
+ stream.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
new file mode 100644
index 0000000..a9e7744
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Specialized TopNByKey accumulation for AutoComplete Demo.
+ */
+public class TopNByKey implements
+ Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>>
+{
+ int n = 10;
+
+ Comparator comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Map<String, Long> defaultAccumulatedValue()
+ {
+ return new HashMap<>();
+ }
+
+ /**
+ * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a
+ * new entry otherwise.
+ * @param accumulatedValue
+ * @param input
+ * @return
+ */
+ @Override
+ public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input)
+ {
+ accumulatedValue.put(input.getValue(), input.getCount());
+ return accumulatedValue;
+ }
+
+ /**
+ * Merge two Maps together. For every key, keep the larger value in the resulted Map.
+ * @param accumulatedValue1
+ * @param accumulatedValue2
+ * @return
+ */
+ @Override
+ public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2)
+ {
+ for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) {
+ if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) {
+ continue;
+ }
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ return accumulatedValue1;
+ }
+
+ /**
+ * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing
+ * those entries.
+ * @param accumulatedValue
+ * @return
+ */
+ @Override
+ public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue)
+ {
+ LinkedList<CompletionCandidate> result = new LinkedList<>();
+ for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) {
+ int k = 0;
+ for (CompletionCandidate inMemory : result) {
+ if (entry.getValue() > inMemory.getCount()) {
+ break;
+ }
+ k++;
+ }
+ result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue()));
+ if (result.size() > n) {
+ result.remove(result.get(result.size() - 1));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value)
+ {
+ return new LinkedList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..de4e590
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TopWikipediaSessions Example.
+ */
+@ApplicationAnnotation(name = "TopWikipediaSessions")
+public class TopWikipediaSessions implements StreamingApplication
+{
+ /**
+ * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
+ */
+ public static class SessionGen extends BaseOperator implements InputOperator
+ {
+ private String[] names = new String[]{"user1", "user2", "user3", "user4"};
+ public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
+
+ private static final Duration RAND_RANGE = Duration.standardDays(365);
+ private Long minTimestamp;
+ private long sleepTime;
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ private String randomName(String[] names)
+ {
+ int index = new Random().nextInt(names.length);
+ return names[index];
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ tupleCount = 0;
+ minTimestamp = System.currentTimeMillis();
+ sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+ output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
+ tupleCount++;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private final int resultSize = 5;
+ private static List<List<TempWrapper>> result = new ArrayList<>();
+
+ public static List<List<TempWrapper>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
+ {
+ if (result.size() == resultSize) {
+ result.remove(0);
+ }
+ result.add(tuple.getValue());
+ }
+ };
+ }
+
+
+ /**
+ * Convert the upstream (user, time) combination to a timestamped tuple of user.
+ */
+ static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
+ {
+ @Override
+ public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
+ {
+ long timestamp = input.getValue();
+ String userName = input.getKey();
+
+ // Sets the implicit timestamp field to be used in windowing.
+ return new Tuple.TimestampedTuple<>(timestamp, userName);
+
+ }
+ }
+
+ /**
+ * Computes the number of edits in each user session. A session is defined as
+ * a string of edits where each is separated from the next by less than an hour.
+ */
+ static class ComputeSessions
+ extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
+ {
+ return inputStream
+
+ // Chuck the stream into session windows.
+ .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Count the number of edits for a user within one session.
+ .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("ComputeSessions"));
+ }
+ }
+
+ /**
+ * A comparator class used for comparing two TempWrapper objects.
+ */
+ public static class Comp implements Comparator<TempWrapper>
+ {
+ @Override
+ public int compare(TempWrapper o1, TempWrapper o2)
+ {
+ return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
+ }
+ }
+
+ /**
+ * A function to extract timestamp from a TempWrapper object.
+ */
+ // TODO: Need to revisit and change back to using TimestampedTuple.
+ public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
+ {
+ @Override
+ public Long apply(@Nullable TempWrapper input)
+ {
+ return input.getTimestamp();
+ }
+ }
+
+ /**
+ * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
+ * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
+ * remove this class.
+ */
+ public static class TempWrapper
+ {
+ private KeyValPair<String, Long> value;
+ private Long timestamp;
+
+ public TempWrapper()
+ {
+
+ }
+
+ public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
+ {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.value + " - " + this.timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public KeyValPair<String, Long> getValue()
+ {
+ return value;
+ }
+
+ public void setValue(KeyValPair<String, Long> value)
+ {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
+ */
+ private static class TopPerMonth
+ extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
+ {
+ TopN<TempWrapper> topN = new TopN<>();
+ topN.setN(10);
+ topN.setComparator(new Comp());
+
+ return inputStream
+
+ // Map the input WindowedTuple to a TempWrapper object.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
+ {
+ @Override
+ public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
+ }
+ }, name("TempWrapper"))
+
+ // Apply window and trigger option again, this time chuck the stream into fixed time windows.
+ .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
+
+ // Compute the top 10 user-sessions with most number of edits.
+ .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
+ }
+ }
+
+ /**
+ * A map function that combine the user and his/her edit session together to a string and use that string as a key
+ * with number of edits in that session as value to create a new key value pair to send to downstream.
+ */
+ static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
+ {
+ @Override
+ public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>(
+ input.getValue().getKey() + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(),
+ input.getValue().getValue()));
+ }
+ }
+
+ /**
+ * A flapmap function that turns the result into readable format.
+ */
+ static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
+ {
+ @Override
+ public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
+ {
+ ArrayList<String> result = new ArrayList<>();
+ for (TempWrapper item : input.getValue()) {
+ String session = item.getValue().getKey();
+ long count = item.getValue().getValue();
+ result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A composite transform that compute the top wikipedia sessions.
+ */
+ public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
+ {
+ return inputStream
+ .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
+ .addCompositeStreams(new ComputeSessions())
+ .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
+ .addCompositeStreams(new TopPerMonth());
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SessionGen sg = new SessionGen();
+ Collector collector = new Collector();
+ StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
+ .addCompositeStreams(new ComputeTopSessions())
+ .endWith(collector, collector.input, name("collector")).populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
new file mode 100644
index 0000000..2cc04d1
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -0,0 +1,521 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.Group;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TrafficRoutes example.
+ */
+@ApplicationAnnotation(name = "TrafficRoutes")
+public class TrafficRoutes implements StreamingApplication
+{
+ static Map<String, String> sdStations = buildStationInfo();
+ static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
+ static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
+
+ /**
+ * This class holds information about a station reading's average speed.
+ */
+ public static class StationSpeed implements Comparable<StationSpeed>
+ {
+ @Nullable
+ String stationId;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Long timestamp;
+
+ public StationSpeed() {}
+
+ public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
+ {
+ this.stationId = stationId;
+ this.avgSpeed = avgSpeed;
+ this.timestamp = timestamp;
+ }
+
+ public void setAvgSpeed(@Nullable Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public void setStationId(@Nullable String stationId)
+ {
+ this.stationId = stationId;
+ }
+
+ public void setTimestamp(@Nullable Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ @Nullable
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public String getStationId()
+ {
+ return this.stationId;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ @Override
+ public int compareTo(StationSpeed other)
+ {
+ return Long.compare(this.timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * This class holds information about a route's speed/slowdown.
+ */
+ static class RouteInfo
+ {
+ @Nullable
+ String route;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Boolean slowdownEvent;
+
+ public RouteInfo()
+ {
+
+ }
+
+ public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
+ {
+ this.route = route;
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public String getRoute()
+ {
+ return this.route;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return this.slowdownEvent;
+ }
+ }
+
+ /**
+ * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
+ * with the extracted timestamp.
+ */
+ static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ String[] items = input.split(",");
+ String timestamp = tryParseTimestamp(items);
+
+ return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
+ }
+ }
+
+ /**
+ * Filter out readings for the stations along predefined 'routes', and output
+ * (station, speed info) keyed on route.
+ */
+ static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
+ {
+
+ @Override
+ public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
+ {
+
+ ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
+ String[] items = input.getValue().split(",");
+ String stationType = tryParseStationType(items);
+ // For this analysis, use only 'main line' station types
+ if (stationType != null && stationType.equals("ML")) {
+ Double avgSpeed = tryParseAvgSpeed(items);
+ String stationId = tryParseStationId(items);
+ // For this simple example, filter out everything but some hardwired routes.
+ if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+ StationSpeed stationSpeed =
+ new StationSpeed(stationId, avgSpeed, input.getTimestamp());
+ // The tuple key is the 'route' name stored in the 'sdStations' hash.
+ KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
+ result.add(outputValue);
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * For a given route, track average speed for the window. Calculate whether
+ * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+ * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+ * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+ */
+ static class GatherStats
+ implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
+ {
+ @Override
+ public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
+ {
+ ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
+ String route = input.getValue().getKey();
+ double speedSum = 0.0;
+ int speedCount = 0;
+ int speedups = 0;
+ int slowdowns = 0;
+ List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
+ // StationSpeeds sort by embedded timestamp.
+ Collections.sort(infoList);
+ Map<String, Double> prevSpeeds = new HashMap<>();
+ // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+ for (StationSpeed item : infoList) {
+ Double speed = item.getAvgSpeed();
+ if (speed != null) {
+ speedSum += speed;
+ speedCount++;
+ Double lastSpeed = prevSpeeds.get(item.getStationId());
+ if (lastSpeed != null) {
+ if (lastSpeed < speed) {
+ speedups += 1;
+ } else {
+ slowdowns += 1;
+ }
+ }
+ prevSpeeds.put(item.getStationId(), speed);
+ }
+ }
+ if (speedCount == 0) {
+ // No average to compute.
+ return result;
+ }
+ double speedAvg = speedSum / speedCount;
+ boolean slowdownEvent = slowdowns >= 2 * speedups;
+ RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+ result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
+ return result;
+ }
+ }
+
+ /**
+ * Output Pojo class for outputting result to JDBC.
+ */
+ static class OutputPojo
+ {
+ private Double avgSpeed;
+ private Boolean slowdownEvent;
+ private String key;
+ private Long timestamp;
+
+ public OutputPojo()
+ {
+ }
+
+ public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
+ {
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setAvgSpeed(Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return avgSpeed;
+ }
+
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public void setSlowdownEvent(Boolean slowdownEvent)
+ {
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return slowdownEvent;
+ }
+
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
+
+ public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
+ {
+ @Override
+ public void process(OutputPojo tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
+ }
+ };
+ }
+
+ /**
+ * Format the results of the slowdown calculations to a OutputPojo.
+ */
+ static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
+ {
+ @Override
+ public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
+ {
+ RouteInfo routeInfo = input.getValue().getValue();
+ OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
+ return row;
+ }
+ }
+
+
+ /**
+ * This composite transformation extracts speed info from traffic station readings.
+ * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+ * Lastly, it formats the results for JDBC.
+ */
+ static class TrackSpeed extends
+ CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
+ {
+ @Override
+ public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
+ {
+ // Apply a GroupByKey transform to collect a list of all station
+ // readings for a given route.
+ WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
+ inputStream
+ .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
+ {
+ return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
+ }
+ }, name("GroupByKey"));
+
+ // Analyze 'slowdown' over the route readings.
+ WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
+ .flatMap(new GatherStats(), name("GatherStats"));
+
+ // Format the results for writing to JDBC table.
+ WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
+
+ return results;
+ }
+ }
+
+
+ private static Double tryParseAvgSpeed(String[] inputItems)
+ {
+ try {
+ return Double.parseDouble(tryParseString(inputItems, 3));
+ } catch (NumberFormatException e) {
+ return null;
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ private static String tryParseStationType(String[] inputItems)
+ {
+ return tryParseString(inputItems, 2);
+ }
+
+ private static String tryParseStationId(String[] inputItems)
+ {
+ return tryParseString(inputItems, 1);
+ }
+
+ private static String tryParseTimestamp(String[] inputItems)
+ {
+ return tryParseString(inputItems, 0);
+ }
+
+ private static String tryParseString(String[] inputItems, int index)
+ {
+ return inputItems.length >= index ? inputItems[index] : null;
+ }
+
+ /**
+ * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+ */
+ private static Map<String, String> buildStationInfo()
+ {
+ Map<String, String> stations = new Hashtable<String, String>();
+ stations.put("1108413", "SDRoute1"); // from freeway 805 S
+ stations.put("1108699", "SDRoute2"); // from freeway 78 E
+ stations.put("1108702", "SDRoute2");
+ return stations;
+ }
+
+ /**
+ * A dummy generator to generate some traffic information.
+ */
+ public static class InfoGen extends BaseOperator implements InputOperator
+ {
+ public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private String[] stationTypes = new String[]{"ML", "BL", "GL"};
+ private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
+ private double ave = 55.0;
+ private long timestamp;
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ tupleCount = 0;
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (String stationType : stationTypes) {
+ for (int stationID : stationIDs) {
+ double speed = Math.random() * 20 + ave;
+ long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
+ try {
+ output.emit(time + "," + stationID + "," + stationType + "," + speed);
+ tupleCount++;
+
+ Thread.sleep(50);
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ InfoGen infoGen = new InfoGen();
+ Collector collector = new Collector();
+
+ // Create a stream from the input operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
+
+ // Extract the timestamp from the input and wrap it into a TimestampedTuple.
+ .map(new ExtractTimestamps(), name("ExtractTimestamps"));
+
+ stream
+ // Extract the average speed of a station.
+ .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
+
+ // Apply window and trigger option.
+ .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
+
+ // Apply TrackSpeed composite transformation to compute the route information.
+ .addCompositeStreams(new TrackSpeed())
+
+ // print the result to console.
+ .print()
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
new file mode 100644
index 0000000..ecad622
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
+ * accordingly in /resources/META-INF/properties.xml.
+ *
+ * The authentication requires following 4 information.
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+@ApplicationAnnotation(name = "TwitterAutoComplete")
+public class TwitterAutoComplete implements StreamingApplication
+{
+ /**
+ * Check whether every character in a string is ASCII encoding.
+ */
+ public static class StringUtils
+ {
+ static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
+
+ public static boolean isAscii(String v)
+ {
+ return encoder.canEncode(v);
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ TopNByKey topNByKey = new TopNByKey();
+ topNByKey.setN(candidatesPerPrefix);
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
+ .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+ }
+ }, name("TopNByKey"));
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("KeyValPair to CompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
+
+ }
+ }
+
+ /**
+ * FilterFunction to filter out tweets with non-acsii characters.
+ */
+ static class ASCIIFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return StringUtils.isAscii(input);
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TwitterSampleInput input = new TwitterSampleInput();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
+ .filter(new ASCIIFilter(), name("ACSII Filter"))
+ .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
+
+ ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
+
+ s.populateDag(dag);
+ }
+}