You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/19 14:54:14 UTC
[3/3] apex-malhar git commit: Merge commit 'refs/pull/480/head' of
https://github.com/apache/apex-malhar
Merge commit 'refs/pull/480/head' of https://github.com/apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a017dfaa
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a017dfaa
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a017dfaa
Branch: refs/heads/master
Commit: a017dfaa45e74e270b4d5619f83994388ed6dc09
Parents: a1c319c 65488fd
Author: Thomas Weise <th...@apache.org>
Authored: Sun Mar 19 07:39:52 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Mar 19 07:39:52 2017 -0700
----------------------------------------------------------------------
.../malhar/stream/sample/MinimalWordCount.java | 2 +-
.../malhar/stream/sample/WindowedWordCount.java | 2 +-
.../stream/sample/complete/AutoComplete.java | 2 +-
.../sample/complete/StreamingWordExtract.java | 2 +-
.../sample/complete/TopWikipediaSessions.java | 2 +-
.../stream/sample/complete/TrafficRoutes.java | 2 +-
.../sample/complete/TwitterAutoComplete.java | 2 +-
.../sample/cookbook/CombinePerKeyExamples.java | 2 +-
.../stream/sample/cookbook/DeDupExample.java | 2 +-
.../sample/cookbook/MaxPerKeyExamples.java | 2 +-
.../stream/sample/cookbook/TriggerExample.java | 2 +-
.../lib/function/AnnonymousClassModifier.java | 134 +++++++
.../apex/malhar/lib/function/Function.java | 87 +++++
.../malhar/lib/function/FunctionOperator.java | 378 +++++++++++++++++++
.../malhar/lib/utils/ByteArrayClassLoader.java | 54 +++
.../apache/apex/malhar/lib/utils/TupleUtil.java | 46 +++
.../apex/malhar/stream/api/ApexStream.java | 2 +-
.../apex/malhar/stream/api/WindowedStream.java | 5 +-
.../malhar/stream/api/function/Function.java | 88 -----
.../malhar/stream/api/impl/ApexStreamImpl.java | 6 +-
.../stream/api/impl/ApexWindowedStreamImpl.java | 2 +-
.../api/operator/AnnonymousClassModifier.java | 134 -------
.../api/operator/ByteArrayClassLoader.java | 54 ---
.../stream/api/operator/FunctionOperator.java | 378 -------------------
.../apex/malhar/stream/api/util/TupleUtil.java | 46 ---
.../FunctionOperator/FunctionOperatorTest.java | 4 +-
.../stream/sample/ApplicationWithStreamAPI.java | 2 +-
.../LocalTestWithoutStreamApplication.java | 2 +-
.../apex/malhar/stream/sample/MyStream.java | 2 +-
.../apex/malhar/stream/sample/MyStreamTest.java | 2 +-
.../stream/sample/WordCountWithStreamAPI.java | 2 +-
31 files changed, 723 insertions(+), 727 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
index 327c882,0000000..160175f
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@@ -1,128 -1,0 +1,128 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam MinimalWordCount Example
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "MinimalWordCount")
+public class MinimalWordCount implements StreamingApplication
+{
+ public static class Collector extends BaseOperator
+ {
+ static Map<String, Long> result;
+ private static boolean done = false;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ result = new HashMap<>();
+ }
+
+ public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
+ {
+ @Override
+ public void process(KeyValPair<String, Long> tuple)
+ {
+ if (tuple.getKey().equals("bye")) {
+ done = true;
+ }
+ result.put(tuple.getKey(), tuple.getValue());
+ }
+ };
+ }
+
+ /**
+ * Populate the dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Collector collector = new Collector();
+ // Create a stream reading from a file line by line using StreamFactory.
+ StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+ // Use a flatmap transformation to extract words from the incoming stream of lines.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[^a-zA-Z']+"));
+
+ }
+ }, name("ExtractWords"))
+ // Apply windowing to the stream for counting, in this case, the window option is global window.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ // Count the appearances of every word.
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
+ }
+ }, name("countByKey"))
+ // Format the counting result to a readable format by unwrapping the tuples.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
+ {
+ @Override
+ public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return input.getValue();
+ }
+ }, name("FormatResults"))
+ // Print the result.
+ .print(name("console"))
+ // Attach a collector to the stream to collect results.
+ .endWith(collector, collector.input, name("Collector"))
+ // populate the dag using the stream.
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index 5b83bd0,0000000..6e57bfd
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@@ -1,290 -1,0 +1,290 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam WindowedWordCount Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "WindowedWordCount")
+public class WindowedWordCount implements StreamingApplication
+{
+ static final int WINDOW_SIZE = 1; // Default window duration in minutes
+
+ /**
+ * A input operator that reads from and output a file line by line to downstream with a time gap between
+ * every two lines.
+ */
+ public static class TextInput extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+ private boolean done = false;
+
+ private transient BufferedReader reader;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+ private static boolean done = false;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ done = false;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ public static Map<KeyValPair<Long, String>, Long> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
+ {
+ @Override
+ public void process(PojoEvent tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
+ if (tuple.getWord().equals("bye")) {
+ done = true;
+ }
+ }
+ };
+ }
+
+ /**
+ * A Pojo Tuple class used for outputting result to JDBC.
+ */
+ public static class PojoEvent
+ {
+ private String word;
+ private long count;
+ private long timestamp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public long getCount()
+ {
+ return count;
+ }
+
+ public void setCount(long count)
+ {
+ this.count = count;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+ }
+
+ /**
+ * A map function that wrap the input string with a random generated timestamp.
+ */
+ public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private final Long minTimestamp;
+
+ AddTimestampFn()
+ {
+ this.minTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ // Generate a timestamp that falls somewhere in the past two hours.
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+
+ return new Tuple.TimestampedTuple<>(randomTimestamp, input);
+ }
+ }
+
+ /** A MapFunction that converts a Word and Count into a PojoEvent. */
+ public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
+ {
+ @Override
+ public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ PojoEvent row = new PojoEvent();
+ row.setTimestamp(input.getTimestamp());
+ row.setCount(input.getValue().getValue());
+ row.setWord(input.getValue().getKey());
+ return row;
+ }
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TextInput input = new TextInput();
+ Collector collector = new Collector();
+
+ // Create stream from the TextInput operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
+
+ // Extract all the words from the input line of text.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ }, name("ExtractWords"))
+
+ // Wrap the word with a randomly generated timestamp.
+ .map(new AddTimestampFn(), name("AddTimestampFn"));
+
+
+ // apply window and trigger option.
+ // TODO: change trigger option to atWaterMark when available.
+ WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
+ .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
+ new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
+
+
+ WindowedStream<PojoEvent> wordCounts =
+ // Perform a countByKey transformation to count the appearance of each word in every time window.
+ windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
+ new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("count words"))
+
+ // Format the output and print out the result.
+ .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
+
+ wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 2db59b6,0000000..571a25f
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@@ -1,324 -1,0 +1,324 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ * This application is identical to TwitterAutoComplete, except it's
+ * reading from a file. This application is mainly for local testing
+ * purpose.
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "AutoComplete")
+public class AutoComplete implements StreamingApplication
+{
+
+ /**
+ * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
+ * half of a second.
+ */
+ public static class TweetsInput extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+ private boolean done;
+
+ private transient BufferedReader reader;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+ private static boolean done = false;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ done = false;
+ }
+
+ public static Map<String, List<CompletionCandidate>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
+ {
+ if (tuple.getValue().getKey().equals("yarn")) {
+ done = true;
+ }
+ result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+ }
+ };
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
+ .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
+ CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+ }
+ });
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("countByKey"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("ToCompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
+
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TweetsInput input = new TweetsInput();
+ Collector collector = new Collector();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
+ .flatMap(new ExtractHashtags());
+
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
+ .endWith(collector, collector.input, name("collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
index 07f01d0,0000000..b5e491e
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@@ -1,162 -1,0 +1,162 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static java.sql.Types.VARCHAR;
+
+/**
+ * Beam StreamingWordExtract Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "StreamingWordExtract")
+public class StreamingWordExtract implements StreamingApplication
+{
+ private static int wordCount = 0; // A counter to count number of words have been extracted.
+ private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
+
+ public int getWordCount()
+ {
+ return wordCount;
+ }
+
+ public int getEntriesMapped()
+ {
+ return entriesMapped;
+ }
+
+ /**
+ * A MapFunction that tokenizes lines of text into individual words.
+ */
+ public static class ExtractWords implements Function.FlatMapFunction<String, String>
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
+ wordCount += result.size();
+ return result;
+ }
+ }
+
+
+ /**
+ * A MapFunction that uppercases a word.
+ */
+ public static class Uppercase implements Function.MapFunction<String, String>
+ {
+ @Override
+ public String f(String input)
+ {
+ return input.toUpperCase();
+ }
+ }
+
+
+ /**
+ * A filter function to filter out empty strings.
+ */
+ public static class EmptyStringFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return !input.isEmpty();
+ }
+ }
+
+
+ /**
+ * A map function to map the result string to a pojo entry.
+ */
+ public static class PojoMapper implements Function.MapFunction<String, Object>
+ {
+
+ @Override
+ public Object f(String input)
+ {
+ PojoEvent pojo = new PojoEvent();
+ pojo.setStringValue(input);
+ entriesMapped++;
+ return pojo;
+ }
+ }
+
+ /**
+ * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
+ */
+ private static List<JdbcFieldInfo> addFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
+ return fieldInfos;
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+ jdbcOutput.setFieldInfos(addFieldInfos());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutput.setStore(outputStore);
+ jdbcOutput.setTablename("TestTable");
+
+ // Create a stream reading from a folder.
+ ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
+
+ // Extract all the words from the input line of text.
+ stream.flatMap(new ExtractWords())
+
+ // Filter out the empty strings.
+ .filter(new EmptyStringFilter())
+
+ // Change every word to uppercase.
+ .map(new Uppercase())
+
+ // Map the resulted word to a Pojo entry.
+ .map(new PojoMapper())
+
+ // Output the entries to JdbcOutput and insert them into a table.
+ .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
+
+ stream.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index 68ec733,0000000..b2b9ae4
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@@ -1,347 -1,0 +1,347 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TopWikipediaSessions Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TopWikipediaSessions")
+public class TopWikipediaSessions implements StreamingApplication
+{
+ /**
+ * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
+ */
+ public static class SessionGen extends BaseOperator implements InputOperator
+ {
+ private String[] names = new String[]{"user1", "user2", "user3", "user4"};
+ public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
+
+ private static final Duration RAND_RANGE = Duration.standardDays(365);
+ private Long minTimestamp;
+ private long sleepTime;
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ private String randomName(String[] names)
+ {
+ int index = new Random().nextInt(names.length);
+ return names[index];
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ tupleCount = 0;
+ minTimestamp = System.currentTimeMillis();
+ sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+ output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
+ tupleCount++;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private final int resultSize = 5;
+ private static List<List<TempWrapper>> result = new ArrayList<>();
+
+ public static List<List<TempWrapper>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
+ {
+ if (result.size() == resultSize) {
+ result.remove(0);
+ }
+ result.add(tuple.getValue());
+ }
+ };
+ }
+
+
+ /**
+ * Convert the upstream (user, time) combination to a timestamped tuple of user.
+ */
+ static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
+ {
+ @Override
+ public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
+ {
+ long timestamp = input.getValue();
+ String userName = input.getKey();
+
+ // Sets the implicit timestamp field to be used in windowing.
+ return new Tuple.TimestampedTuple<>(timestamp, userName);
+
+ }
+ }
+
+ /**
+ * Computes the number of edits in each user session. A session is defined as
+ * a string of edits where each is separated from the next by less than an hour.
+ */
+ static class ComputeSessions
+ extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
+ {
+ return inputStream
+
+ // Chuck the stream into session windows.
+ .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Count the number of edits for a user within one session.
+ .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("ComputeSessions"));
+ }
+ }
+
+ /**
+ * A comparator class used for comparing two TempWrapper objects.
+ */
+ public static class Comp implements Comparator<TempWrapper>
+ {
+ @Override
+ public int compare(TempWrapper o1, TempWrapper o2)
+ {
+ return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
+ }
+ }
+
+ /**
+ * A function to extract timestamp from a TempWrapper object.
+ */
+ // TODO: Need to revisit and change back to using TimestampedTuple.
+ public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
+ {
+ @Override
+ public Long apply(@Nullable TempWrapper input)
+ {
+ return input.getTimestamp();
+ }
+ }
+
+ /**
+ * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
+ * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
+ * remove this class.
+ */
+ public static class TempWrapper
+ {
+ private KeyValPair<String, Long> value;
+ private Long timestamp;
+
+ public TempWrapper()
+ {
+
+ }
+
+ public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
+ {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.value + " - " + this.timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public KeyValPair<String, Long> getValue()
+ {
+ return value;
+ }
+
+ public void setValue(KeyValPair<String, Long> value)
+ {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
+ */
+ private static class TopPerMonth
+ extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
+ {
+ TopN<TempWrapper> topN = new TopN<>();
+ topN.setN(10);
+ topN.setComparator(new Comp());
+
+ return inputStream
+
+ // Map the input WindowedTuple to a TempWrapper object.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
+ {
+ @Override
+ public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ Window window = input.getWindows().iterator().next();
+ return new TempWrapper(input.getValue(), window.getBeginTimestamp());
+ }
+ }, name("TempWrapper"))
+
+ // Apply window and trigger option again, this time chuck the stream into fixed time windows.
+ .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
+
+ // Compute the top 10 user-sessions with most number of edits.
+ .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
+ }
+ }
+
+ /**
+ * A map function that combine the user and his/her edit session together to a string and use that string as a key
+ * with number of edits in that session as value to create a new key value pair to send to downstream.
+ */
+ static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
+ {
+ @Override
+ public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ Window window = input.getWindows().iterator().next();
+ return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
+ input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
+ input.getValue().getValue()));
+ }
+ }
+
+ /**
+ * A flatmap function that turns the result into readable format.
+ */
+ static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
+ {
+ @Override
+ public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
+ {
+ ArrayList<String> result = new ArrayList<>();
+ for (TempWrapper item : input.getValue()) {
+ String session = item.getValue().getKey();
+ long count = item.getValue().getValue();
+ Window window = input.getWindows().iterator().next();
+ result.add(session + " + " + count + " : " + window.getBeginTimestamp());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A composite transform that compute the top wikipedia sessions.
+ */
+ public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
+ {
+ return inputStream
+ .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
+ .addCompositeStreams(new ComputeSessions())
+ .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
+ .addCompositeStreams(new TopPerMonth());
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SessionGen sg = new SessionGen();
+ Collector collector = new Collector();
+ StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
+ .addCompositeStreams(new ComputeTopSessions())
+ .print(name("console"))
+ .endWith(collector, collector.input, name("collector")).populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index e6a53d6,0000000..431263a
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@@ -1,523 -1,0 +1,523 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TrafficRoutes example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TrafficRoutes")
+public class TrafficRoutes implements StreamingApplication
+{
+ static Map<String, String> sdStations = buildStationInfo();
+ static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
+ static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
+
+ /**
+ * This class holds information about a station reading's average speed.
+ */
+ public static class StationSpeed implements Comparable<StationSpeed>
+ {
+ @Nullable
+ String stationId;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Long timestamp;
+
+ public StationSpeed() {}
+
+ public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
+ {
+ this.stationId = stationId;
+ this.avgSpeed = avgSpeed;
+ this.timestamp = timestamp;
+ }
+
+ public void setAvgSpeed(@Nullable Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public void setStationId(@Nullable String stationId)
+ {
+ this.stationId = stationId;
+ }
+
+ public void setTimestamp(@Nullable Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ @Nullable
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public String getStationId()
+ {
+ return this.stationId;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ @Override
+ public int compareTo(StationSpeed other)
+ {
+ return Long.compare(this.timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * This class holds information about a route's speed/slowdown.
+ */
+ static class RouteInfo
+ {
+ @Nullable
+ String route;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Boolean slowdownEvent;
+
+ public RouteInfo()
+ {
+
+ }
+
+ public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
+ {
+ this.route = route;
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public String getRoute()
+ {
+ return this.route;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return this.slowdownEvent;
+ }
+ }
+
+ /**
+ * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
+ * with the extracted timestamp.
+ */
+ static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ String[] items = input.split(",");
+ String timestamp = tryParseTimestamp(items);
+
+ return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
+ }
+ }
+
+ /**
+ * Filter out readings for the stations along predefined 'routes', and output
+ * (station, speed info) keyed on route.
+ */
+ static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
+ {
+
+ @Override
+ public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
+ {
+
+ ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
+ String[] items = input.getValue().split(",");
+ String stationType = tryParseStationType(items);
+ // For this analysis, use only 'main line' station types
+ if (stationType != null && stationType.equals("ML")) {
+ Double avgSpeed = tryParseAvgSpeed(items);
+ String stationId = tryParseStationId(items);
+ // For this simple example, filter out everything but some hardwired routes.
+ if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+ StationSpeed stationSpeed =
+ new StationSpeed(stationId, avgSpeed, input.getTimestamp());
+ // The tuple key is the 'route' name stored in the 'sdStations' hash.
+ KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
+ result.add(outputValue);
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * For a given route, track average speed for the window. Calculate whether
+ * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+ * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+ * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+ */
+ static class GatherStats
+ implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
+ {
+ @Override
+ public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
+ {
+ ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
+ String route = input.getValue().getKey();
+ double speedSum = 0.0;
+ int speedCount = 0;
+ int speedups = 0;
+ int slowdowns = 0;
+ List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
+ // StationSpeeds sort by embedded timestamp.
+ Collections.sort(infoList);
+ Map<String, Double> prevSpeeds = new HashMap<>();
+ // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+ for (StationSpeed item : infoList) {
+ Double speed = item.getAvgSpeed();
+ if (speed != null) {
+ speedSum += speed;
+ speedCount++;
+ Double lastSpeed = prevSpeeds.get(item.getStationId());
+ if (lastSpeed != null) {
+ if (lastSpeed < speed) {
+ speedups += 1;
+ } else {
+ slowdowns += 1;
+ }
+ }
+ prevSpeeds.put(item.getStationId(), speed);
+ }
+ }
+ if (speedCount == 0) {
+ // No average to compute.
+ return result;
+ }
+ double speedAvg = speedSum / speedCount;
+ boolean slowdownEvent = slowdowns >= 2 * speedups;
+ RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+ result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
+ return result;
+ }
+ }
+
+ /**
+ * Output Pojo class for outputting result to JDBC.
+ */
+ static class OutputPojo
+ {
+ private Double avgSpeed;
+ private Boolean slowdownEvent;
+ private String key;
+ private Long timestamp;
+
+ public OutputPojo()
+ {
+ }
+
+ public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
+ {
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setAvgSpeed(Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return avgSpeed;
+ }
+
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public void setSlowdownEvent(Boolean slowdownEvent)
+ {
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return slowdownEvent;
+ }
+
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
+
+ public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
+ {
+ @Override
+ public void process(OutputPojo tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
+ }
+ };
+ }
+
+ /**
+ * Format the results of the slowdown calculations to a OutputPojo.
+ */
+ static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
+ {
+ @Override
+ public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
+ {
+ RouteInfo routeInfo = input.getValue().getValue();
+ OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
+ return row;
+ }
+ }
+
+
+ /**
+ * This composite transformation extracts speed info from traffic station readings.
+ * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+ * Lastly, it formats the results for JDBC.
+ */
+ static class TrackSpeed extends
+ CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
+ {
+ @Override
+ public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
+ {
+ // Apply a GroupByKey transform to collect a list of all station
+ // readings for a given route.
+ WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
+ inputStream
+ .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
+ {
+ return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
+ }
+ }, name("GroupByKey"));
+
+ // Analyze 'slowdown' over the route readings.
+ WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
+ .flatMap(new GatherStats(), name("GatherStats"));
+
+ // Format the results for writing to JDBC table.
+ WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
+
+ return results;
+ }
+ }
+
+
+ private static Double tryParseAvgSpeed(String[] inputItems)
+ {
+ try {
+ return Double.parseDouble(tryParseString(inputItems, 3));
+ } catch (NumberFormatException e) {
+ return null;
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ private static String tryParseStationType(String[] inputItems)
+ {
+ return tryParseString(inputItems, 2);
+ }
+
+ private static String tryParseStationId(String[] inputItems)
+ {
+ return tryParseString(inputItems, 1);
+ }
+
+ private static String tryParseTimestamp(String[] inputItems)
+ {
+ return tryParseString(inputItems, 0);
+ }
+
+ private static String tryParseString(String[] inputItems, int index)
+ {
+ return inputItems.length >= index ? inputItems[index] : null;
+ }
+
+ /**
+ * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+ */
+ private static Map<String, String> buildStationInfo()
+ {
+ Map<String, String> stations = new Hashtable<String, String>();
+ stations.put("1108413", "SDRoute1"); // from freeway 805 S
+ stations.put("1108699", "SDRoute2"); // from freeway 78 E
+ stations.put("1108702", "SDRoute2");
+ return stations;
+ }
+
+ /**
+ * A dummy generator to generate some traffic information.
+ */
+ public static class InfoGen extends BaseOperator implements InputOperator
+ {
+ public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private String[] stationTypes = new String[]{"ML", "BL", "GL"};
+ private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
+ private double ave = 55.0;
+ private long timestamp;
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ tupleCount = 0;
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (String stationType : stationTypes) {
+ for (int stationID : stationIDs) {
+ double speed = Math.random() * 20 + ave;
+ long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
+ try {
+ output.emit(time + "," + stationID + "," + stationType + "," + speed);
+ tupleCount++;
+
+ Thread.sleep(50);
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ InfoGen infoGen = new InfoGen();
+ Collector collector = new Collector();
+
+ // Create a stream from the input operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
+
+ // Extract the timestamp from the input and wrap it into a TimestampedTuple.
+ .map(new ExtractTimestamps(), name("ExtractTimestamps"));
+
+ stream
+ // Extract the average speed of a station.
+ .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
+
+ // Apply window and trigger option.
+ .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
+
+ // Apply TrackSpeed composite transformation to compute the route information.
+ .addCompositeStreams(new TrackSpeed())
+
+ // print the result to console.
+ .print(name("console"))
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
index 6332c66,0000000..cf52cb6
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@@ -1,254 -1,0 +1,254 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.joda.time.Duration;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
+ * accordingly in /resources/META-INF/properties.xml.
+ *
+ * The authentication requires following 4 information.
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TwitterAutoComplete")
+public class TwitterAutoComplete implements StreamingApplication
+{
+ /**
+ * Check whether every character in a string is ASCII encoding.
+ */
+ public static class StringUtils
+ {
+ static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
+
+ public static boolean isAscii(String v)
+ {
+ return encoder.canEncode(v);
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ TopNByKey topNByKey = new TopNByKey();
+ topNByKey.setN(candidatesPerPrefix);
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
+ .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+ }
+ }, name("TopNByKey"));
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("Hashtag Count"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("KeyValPair to CompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
+
+ }
+ }
+
+ /**
+ * FilterFunction to filter out tweets with non-acsii characters.
+ */
+ static class ASCIIFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return StringUtils.isAscii(input);
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TwitterSampleInput input = new TwitterSampleInput();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
+ .filter(new ASCIIFilter(), name("ACSII Filter"))
+ .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
+
+ ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
+
+ s.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a017dfaa/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --cc examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index bfdb268,0000000..937476e
mode 100644,000000..100644
--- a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@@ -1,285 -1,0 +1,285 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.ArrayList;
+import java.util.List;
+
++import org.apache.apex.malhar.lib.function.Function;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
- import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears
+ *
+ * <p>Concepts: the combine transform, which lets you combine the values in a
+ * key-grouped Collection
+ *
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "CombinePerKeyExamples")
+public class CombinePerKeyExamples implements StreamingApplication
+{
+ // Use the shakespeare public BigQuery sample
+ private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
+ // We'll track words >= this word length across all plays in the table.
+ private static final int MIN_WORD_LENGTH = 0;
+
+ /**
+ * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+ * outputs word, play_name.
+ */
+ static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
+ {
+
+ @Override
+ public KeyValPair<String, String> f(SampleBean input)
+ {
+ String playName = input.getCorpus();
+ String word = input.getWord();
+ if (word.length() >= MIN_WORD_LENGTH) {
+ return new KeyValPair<>(word, playName);
+ } else {
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Prepares the output data which is in same bean
+ */
+ static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
+ {
+ @Override
+ public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
+ {
+ return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }
+
+ /**
+ * A reduce function to concat two strings together.
+ */
+ public static class Concat extends ReduceFn<String>
+ {
+ @Override
+ public String reduce(String input1, String input2)
+ {
+ return input1 + ", " + input2;
+ }
+ }
+
+ /**
+ * Reads the public 'Shakespeare' data, and for each word in the dataset
+ * over a given length, generates a string containing the list of play names
+ * in which that word appears.
+ */
+ private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
+ {
+
+ @Override
+ public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+ {
+ return inputStream
+ // Extract words from the input SampleBeam stream.
+ .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
+
+ // Apply window and trigger option to the streams.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
+ .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
+ {
+ return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
+ }
+ }, name("Concat"))
+
+ // Format the output back to a SampleBeam object.
+ .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
+ }
+ }
+
+
+ /**
+ * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
+ */
+ public static class SampleBean
+ {
+
+ public SampleBean()
+ {
+
+ }
+
+ public SampleBean(String word, String corpus)
+ {
+ this.word = word;
+ this.corpus = corpus;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.word + " : " + this.corpus;
+ }
+
+ private String word;
+
+ private String corpus;
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setCorpus(String corpus)
+ {
+ this.corpus = corpus;
+ }
+
+ public String getCorpus()
+ {
+ return corpus;
+ }
+ }
+
+ /**
+ * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
+ * data.
+ */
+ public static class SampleInput extends BaseOperator implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
+ private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
+ private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
+ private static int i;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ i = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i < 1) {
+ for (String word : words) {
+ for (String corpus : corpuses) {
+ try {
+ Thread.sleep(50);
+ beanOutput.emit(new SampleBean(word, corpus));
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+ }
+ i++;
+ }
+
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static List<SampleBean> result;
+ private static boolean done = false;
+
+ public static List<SampleBean> getResult()
+ {
+ return result;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ result = new ArrayList<>();
+ done = false;
+ }
+
+ public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
+ {
+ @Override
+ public void process(SampleBean tuple)
+ {
+ if (tuple.getWord().equals("F")) {
+ done = true;
+ }
+ result.add(tuple);
+ }
+ };
+ }
+
+ /**
+ * Populate dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SampleInput input = new SampleInput();
+ Collector collector = new Collector();
+ StreamFactory.fromInput(input, input.beanOutput, name("input"))
+ .addCompositeStreams(new PlaysForWord())
+ .print(name("console"))
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+
+ }
+}