You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:17 UTC
[12/30] apex-malhar git commit: Renamed demos to examples. Packages
and artifactid names are changed as suggested.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
new file mode 100644
index 0000000..2db59b6
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ * This application is identical to TwitterAutoComplete, except it's
+ * reading from a file. This application is mainly for local testing
+ * purpose.
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "AutoComplete")
+public class AutoComplete implements StreamingApplication
+{
+
+ /**
+ * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
+ * half of a second.
+ */
+ public static class TweetsInput extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+ private boolean done;
+
+ private transient BufferedReader reader;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ done = false;
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
+ reader = new BufferedReader(new InputStreamReader(resourceStream));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ done = true;
+ reader.close();
+ } else {
+ this.output.emit(line);
+ }
+ Thread.sleep(50);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+ private static boolean done = false;
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ done = false;
+ }
+
+ public static Map<String, List<CompletionCandidate>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
+ {
+ if (tuple.getValue().getKey().equals("yarn")) {
+ done = true;
+ }
+ result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+ }
+ };
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
+ .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
+ CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+ }
+ });
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("countByKey"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("ToCompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
+
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TweetsInput input = new TweetsInput();
+ Collector collector = new Collector();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
+ .flatMap(new ExtractHashtags());
+
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
+ .endWith(collector, collector.input, name("collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
new file mode 100644
index 0000000..bd5c511
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Class used to store tag-count pairs in Auto Complete Example.
+ *
+ * @since 3.5.0
+ */
+public class CompletionCandidate implements Comparable<CompletionCandidate>
+{
+ private long count;
+ private String value;
+
+ public CompletionCandidate(String value, long count)
+ {
+ this.value = value;
+ this.count = count;
+ }
+
+ public long getCount()
+ {
+ return count;
+ }
+
+ public String getValue()
+ {
+ return value;
+ }
+
+ // Empty constructor required for Kryo.
+ public CompletionCandidate()
+ {
+
+ }
+
+ @Override
+ public int compareTo(CompletionCandidate o)
+ {
+ if (this.count < o.count) {
+ return -1;
+ } else if (this.count == o.count) {
+ return this.value.compareTo(o.value);
+ } else {
+ return 1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (other instanceof CompletionCandidate) {
+ CompletionCandidate that = (CompletionCandidate)other;
+ return this.count == that.count && this.value.equals(that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.valueOf(count).hashCode() ^ value.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "CompletionCandidate[" + value + ", " + count + "]";
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
new file mode 100644
index 0000000..ee15d90
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Tuple Class for JdbcOutput of StreamingWordExtract.
+ *
+ * @since 3.5.0
+ */
+public class PojoEvent extends Object
+{
+ private String stringValue;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [stringValue=" + getStringValue() + "]";
+ }
+
+ public void setStringValue(String newString)
+ {
+ this.stringValue = newString;
+ }
+
+ public String getStringValue()
+ {
+ return this.stringValue;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..07f01d0
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static java.sql.Types.VARCHAR;
+
+/**
+ * Beam StreamingWordExtract Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "StreamingWordExtract")
+public class StreamingWordExtract implements StreamingApplication
+{
+ private static int wordCount = 0; // A counter to count number of words have been extracted.
+ private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
+
+ public int getWordCount()
+ {
+ return wordCount;
+ }
+
+ public int getEntriesMapped()
+ {
+ return entriesMapped;
+ }
+
+ /**
+ * A MapFunction that tokenizes lines of text into individual words.
+ */
+ public static class ExtractWords implements Function.FlatMapFunction<String, String>
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
+ wordCount += result.size();
+ return result;
+ }
+ }
+
+
+ /**
+ * A MapFunction that uppercases a word.
+ */
+ public static class Uppercase implements Function.MapFunction<String, String>
+ {
+ @Override
+ public String f(String input)
+ {
+ return input.toUpperCase();
+ }
+ }
+
+
+ /**
+ * A filter function to filter out empty strings.
+ */
+ public static class EmptyStringFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return !input.isEmpty();
+ }
+ }
+
+
+ /**
+ * A map function to map the result string to a pojo entry.
+ */
+ public static class PojoMapper implements Function.MapFunction<String, Object>
+ {
+
+ @Override
+ public Object f(String input)
+ {
+ PojoEvent pojo = new PojoEvent();
+ pojo.setStringValue(input);
+ entriesMapped++;
+ return pojo;
+ }
+ }
+
+ /**
+ * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
+ */
+ private static List<JdbcFieldInfo> addFieldInfos()
+ {
+ List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
+ fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
+ return fieldInfos;
+ }
+
+ /**
+ * Populate dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+ jdbcOutput.setFieldInfos(addFieldInfos());
+ JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+ jdbcOutput.setStore(outputStore);
+ jdbcOutput.setTablename("TestTable");
+
+ // Create a stream reading from a folder.
+ ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
+
+ // Extract all the words from the input line of text.
+ stream.flatMap(new ExtractWords())
+
+ // Filter out the empty strings.
+ .filter(new EmptyStringFilter())
+
+ // Change every word to uppercase.
+ .map(new Uppercase())
+
+ // Map the resulted word to a Pojo entry.
+ .map(new PojoMapper())
+
+ // Output the entries to JdbcOutput and insert them into a table.
+ .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
+
+ stream.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
new file mode 100644
index 0000000..937254c
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Specialized TopNByKey accumulation for AutoComplete Example.
+ *
+ * @since 3.5.0
+ */
+public class TopNByKey implements
+ Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>>
+{
+ int n = 10;
+
+ Comparator comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Map<String, Long> defaultAccumulatedValue()
+ {
+ return new HashMap<>();
+ }
+
+ /**
+ * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a
+ * new entry otherwise.
+ * @param accumulatedValue
+ * @param input
+ * @return
+ */
+ @Override
+ public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input)
+ {
+ accumulatedValue.put(input.getValue(), input.getCount());
+ return accumulatedValue;
+ }
+
+ /**
+ * Merge two Maps together. For every key, keep the larger value in the resulted Map.
+ * @param accumulatedValue1
+ * @param accumulatedValue2
+ * @return
+ */
+ @Override
+ public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2)
+ {
+ for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) {
+ if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) {
+ continue;
+ }
+ accumulatedValue1.put(entry.getKey(), entry.getValue());
+ }
+ return accumulatedValue1;
+ }
+
+ /**
+ * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing
+ * those entries.
+ * @param accumulatedValue
+ * @return
+ */
+ @Override
+ public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue)
+ {
+ LinkedList<CompletionCandidate> result = new LinkedList<>();
+ for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) {
+ int k = 0;
+ for (CompletionCandidate inMemory : result) {
+ if (entry.getValue() > inMemory.getCount()) {
+ break;
+ }
+ k++;
+ }
+ result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue()));
+ if (result.size() > n) {
+ result.remove(result.get(result.size() - 1));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value)
+ {
+ return new LinkedList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..68ec733
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TopWikipediaSessions Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TopWikipediaSessions")
+public class TopWikipediaSessions implements StreamingApplication
+{
+ /**
+ * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
+ */
+ public static class SessionGen extends BaseOperator implements InputOperator
+ {
+ private String[] names = new String[]{"user1", "user2", "user3", "user4"};
+ public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
+
+ private static final Duration RAND_RANGE = Duration.standardDays(365);
+ private Long minTimestamp;
+ private long sleepTime;
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ private String randomName(String[] names)
+ {
+ int index = new Random().nextInt(names.length);
+ return names[index];
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ tupleCount = 0;
+ minTimestamp = System.currentTimeMillis();
+ sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+ long randomTimestamp = minTimestamp + randMillis;
+ output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
+ tupleCount++;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // Ignore it.
+ }
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private final int resultSize = 5;
+ private static List<List<TempWrapper>> result = new ArrayList<>();
+
+ public static List<List<TempWrapper>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
+ {
+ if (result.size() == resultSize) {
+ result.remove(0);
+ }
+ result.add(tuple.getValue());
+ }
+ };
+ }
+
+
+ /**
+ * Convert the upstream (user, time) combination to a timestamped tuple of user.
+ */
+ static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
+ {
+ @Override
+ public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
+ {
+ long timestamp = input.getValue();
+ String userName = input.getKey();
+
+ // Sets the implicit timestamp field to be used in windowing.
+ return new Tuple.TimestampedTuple<>(timestamp, userName);
+
+ }
+ }
+
+ /**
+ * Computes the number of edits in each user session. A session is defined as
+ * a string of edits where each is separated from the next by less than an hour.
+ */
+ static class ComputeSessions
+ extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
+ {
+ return inputStream
+
+ // Chuck the stream into session windows.
+ .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Count the number of edits for a user within one session.
+ .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+ {
+ @Override
+ public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+ {
+ return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
+ }
+ }, name("ComputeSessions"));
+ }
+ }
+
+ /**
+ * A comparator class used for comparing two TempWrapper objects.
+ */
+ public static class Comp implements Comparator<TempWrapper>
+ {
+ @Override
+ public int compare(TempWrapper o1, TempWrapper o2)
+ {
+ return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
+ }
+ }
+
+ /**
+ * A function to extract timestamp from a TempWrapper object.
+ */
+ // TODO: Need to revisit and change back to using TimestampedTuple.
+ public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
+ {
+ @Override
+ public Long apply(@Nullable TempWrapper input)
+ {
+ return input.getTimestamp();
+ }
+ }
+
+ /**
+ * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
+ * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
+ * remove this class.
+ */
+ public static class TempWrapper
+ {
+ private KeyValPair<String, Long> value;
+ private Long timestamp;
+
+ public TempWrapper()
+ {
+
+ }
+
+ public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
+ {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.value + " - " + this.timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public KeyValPair<String, Long> getValue()
+ {
+ return value;
+ }
+
+ public void setValue(KeyValPair<String, Long> value)
+ {
+ this.value = value;
+ }
+ }
+
+ /**
+ * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
+ */
+ private static class TopPerMonth
+ extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
+ {
+ TopN<TempWrapper> topN = new TopN<>();
+ topN.setN(10);
+ topN.setComparator(new Comp());
+
+ return inputStream
+
+ // Map the input WindowedTuple to a TempWrapper object.
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
+ {
+ @Override
+ public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ Window window = input.getWindows().iterator().next();
+ return new TempWrapper(input.getValue(), window.getBeginTimestamp());
+ }
+ }, name("TempWrapper"))
+
+ // Apply window and trigger option again, this time chuck the stream into fixed time windows.
+ .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
+
+ // Compute the top 10 user-sessions with most number of edits.
+ .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
+ }
+ }
+
+ /**
+ * A map function that combine the user and his/her edit session together to a string and use that string as a key
+ * with number of edits in that session as value to create a new key value pair to send to downstream.
+ */
+ static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
+ {
+ @Override
+ public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ Window window = input.getWindows().iterator().next();
+ return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
+ input.getValue().getKey() + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
+ input.getValue().getValue()));
+ }
+ }
+
+ /**
+ * A flatmap function that turns the result into readable format.
+ */
+ static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
+ {
+ @Override
+ public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
+ {
+ ArrayList<String> result = new ArrayList<>();
+ for (TempWrapper item : input.getValue()) {
+ String session = item.getValue().getKey();
+ long count = item.getValue().getValue();
+ Window window = input.getWindows().iterator().next();
+ result.add(session + " + " + count + " : " + window.getBeginTimestamp());
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A composite transform that compute the top wikipedia sessions.
+ */
+ public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+ {
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
+ {
+ return inputStream
+ .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
+ .addCompositeStreams(new ComputeSessions())
+ .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
+ .addCompositeStreams(new TopPerMonth());
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SessionGen sg = new SessionGen();
+ Collector collector = new Collector();
+ StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
+ .addCompositeStreams(new ComputeTopSessions())
+ .print(name("console"))
+ .endWith(collector, collector.input, name("collector")).populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
new file mode 100644
index 0000000..e6a53d6
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TrafficRoutes example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TrafficRoutes")
+public class TrafficRoutes implements StreamingApplication
+{
+ static Map<String, String> sdStations = buildStationInfo();
+ static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
+ static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
+
+ /**
+ * This class holds information about a station reading's average speed.
+ */
+ public static class StationSpeed implements Comparable<StationSpeed>
+ {
+ @Nullable
+ String stationId;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Long timestamp;
+
+ public StationSpeed() {}
+
+ public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
+ {
+ this.stationId = stationId;
+ this.avgSpeed = avgSpeed;
+ this.timestamp = timestamp;
+ }
+
+ public void setAvgSpeed(@Nullable Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public void setStationId(@Nullable String stationId)
+ {
+ this.stationId = stationId;
+ }
+
+ public void setTimestamp(@Nullable Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ @Nullable
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public String getStationId()
+ {
+ return this.stationId;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ @Override
+ public int compareTo(StationSpeed other)
+ {
+ return Long.compare(this.timestamp, other.timestamp);
+ }
+ }
+
+ /**
+ * This class holds information about a route's speed/slowdown.
+ */
+ static class RouteInfo
+ {
+ @Nullable
+ String route;
+ @Nullable
+ Double avgSpeed;
+ @Nullable
+ Boolean slowdownEvent;
+
+ public RouteInfo()
+ {
+
+ }
+
+ public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
+ {
+ this.route = route;
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public String getRoute()
+ {
+ return this.route;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return this.avgSpeed;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return this.slowdownEvent;
+ }
+ }
+
+ /**
+ * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
+ * with the extracted timestamp.
+ */
+ static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+ {
+
+ @Override
+ public Tuple.TimestampedTuple<String> f(String input)
+ {
+ String[] items = input.split(",");
+ String timestamp = tryParseTimestamp(items);
+
+ return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
+ }
+ }
+
+ /**
+ * Filter out readings for the stations along predefined 'routes', and output
+ * (station, speed info) keyed on route.
+ */
+ static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
+ {
+
+ @Override
+ public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
+ {
+
+ ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
+ String[] items = input.getValue().split(",");
+ String stationType = tryParseStationType(items);
+ // For this analysis, use only 'main line' station types
+ if (stationType != null && stationType.equals("ML")) {
+ Double avgSpeed = tryParseAvgSpeed(items);
+ String stationId = tryParseStationId(items);
+ // For this simple example, filter out everything but some hardwired routes.
+ if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+ StationSpeed stationSpeed =
+ new StationSpeed(stationId, avgSpeed, input.getTimestamp());
+ // The tuple key is the 'route' name stored in the 'sdStations' hash.
+ KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
+ result.add(outputValue);
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
+ * For a given route, track average speed for the window. Calculate whether
+ * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+ * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+ * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+ */
+ static class GatherStats
+ implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
+ {
+ @Override
+ public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
+ {
+ ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
+ String route = input.getValue().getKey();
+ double speedSum = 0.0;
+ int speedCount = 0;
+ int speedups = 0;
+ int slowdowns = 0;
+ List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
+ // StationSpeeds sort by embedded timestamp.
+ Collections.sort(infoList);
+ Map<String, Double> prevSpeeds = new HashMap<>();
+ // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+ for (StationSpeed item : infoList) {
+ Double speed = item.getAvgSpeed();
+ if (speed != null) {
+ speedSum += speed;
+ speedCount++;
+ Double lastSpeed = prevSpeeds.get(item.getStationId());
+ if (lastSpeed != null) {
+ if (lastSpeed < speed) {
+ speedups += 1;
+ } else {
+ slowdowns += 1;
+ }
+ }
+ prevSpeeds.put(item.getStationId(), speed);
+ }
+ }
+ if (speedCount == 0) {
+ // No average to compute.
+ return result;
+ }
+ double speedAvg = speedSum / speedCount;
+ boolean slowdownEvent = slowdowns >= 2 * speedups;
+ RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+ result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
+ return result;
+ }
+ }
+
+ /**
+ * Output Pojo class for outputting result to JDBC.
+ */
+ static class OutputPojo
+ {
+ private Double avgSpeed;
+ private Boolean slowdownEvent;
+ private String key;
+ private Long timestamp;
+
+ public OutputPojo()
+ {
+ }
+
+ public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
+ {
+ this.avgSpeed = avgSpeed;
+ this.slowdownEvent = slowdownEvent;
+ this.key = key;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString()
+ {
+ return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
+ }
+
+ public void setTimestamp(Long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public Long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setAvgSpeed(Double avgSpeed)
+ {
+ this.avgSpeed = avgSpeed;
+ }
+
+ public Double getAvgSpeed()
+ {
+ return avgSpeed;
+ }
+
+ public void setKey(String key)
+ {
+ this.key = key;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public void setSlowdownEvent(Boolean slowdownEvent)
+ {
+ this.slowdownEvent = slowdownEvent;
+ }
+
+ public Boolean getSlowdownEvent()
+ {
+ return slowdownEvent;
+ }
+
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
+
+ public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
+ {
+ return result;
+ }
+
+ public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
+ {
+ @Override
+ public void process(OutputPojo tuple)
+ {
+ result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
+ }
+ };
+ }
+
+ /**
+ * Format the results of the slowdown calculations to a OutputPojo.
+ */
+ static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
+ {
+ @Override
+ public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
+ {
+ RouteInfo routeInfo = input.getValue().getValue();
+ OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
+ return row;
+ }
+ }
+
+
+ /**
+ * This composite transformation extracts speed info from traffic station readings.
+ * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+ * Lastly, it formats the results for JDBC.
+ */
+ static class TrackSpeed extends
+ CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
+ {
+ @Override
+ public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
+ {
+ // Apply a GroupByKey transform to collect a list of all station
+ // readings for a given route.
+ WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
+ inputStream
+ .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
+ {
+ return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
+ }
+ }, name("GroupByKey"));
+
+ // Analyze 'slowdown' over the route readings.
+ WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
+ .flatMap(new GatherStats(), name("GatherStats"));
+
+ // Format the results for writing to JDBC table.
+ WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
+
+ return results;
+ }
+ }
+
+
+ private static Double tryParseAvgSpeed(String[] inputItems)
+ {
+ try {
+ return Double.parseDouble(tryParseString(inputItems, 3));
+ } catch (NumberFormatException e) {
+ return null;
+ } catch (NullPointerException e) {
+ return null;
+ }
+ }
+
+ private static String tryParseStationType(String[] inputItems)
+ {
+ return tryParseString(inputItems, 2);
+ }
+
+ private static String tryParseStationId(String[] inputItems)
+ {
+ return tryParseString(inputItems, 1);
+ }
+
+ private static String tryParseTimestamp(String[] inputItems)
+ {
+ return tryParseString(inputItems, 0);
+ }
+
+ private static String tryParseString(String[] inputItems, int index)
+ {
+ return inputItems.length >= index ? inputItems[index] : null;
+ }
+
+ /**
+ * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+ */
+ private static Map<String, String> buildStationInfo()
+ {
+ Map<String, String> stations = new Hashtable<String, String>();
+ stations.put("1108413", "SDRoute1"); // from freeway 805 S
+ stations.put("1108699", "SDRoute2"); // from freeway 78 E
+ stations.put("1108702", "SDRoute2");
+ return stations;
+ }
+
+ /**
+ * A dummy generator to generate some traffic information.
+ */
+ public static class InfoGen extends BaseOperator implements InputOperator
+ {
+ public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private String[] stationTypes = new String[]{"ML", "BL", "GL"};
+ private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
+ private double ave = 55.0;
+ private long timestamp;
+ private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+ private static int tupleCount = 0;
+
+ public static int getTupleCount()
+ {
+ return tupleCount;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ tupleCount = 0;
+ timestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ for (String stationType : stationTypes) {
+ for (int stationID : stationIDs) {
+ double speed = Math.random() * 20 + ave;
+ long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
+ try {
+ output.emit(time + "," + stationID + "," + stationType + "," + speed);
+ tupleCount++;
+
+ Thread.sleep(50);
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ InfoGen infoGen = new InfoGen();
+ Collector collector = new Collector();
+
+ // Create a stream from the input operator.
+ ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
+
+ // Extract the timestamp from the input and wrap it into a TimestampedTuple.
+ .map(new ExtractTimestamps(), name("ExtractTimestamps"));
+
+ stream
+ // Extract the average speed of a station.
+ .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
+
+ // Apply window and trigger option.
+ .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
+
+ // Apply TrackSpeed composite transformation to compute the route information.
+ .addCompositeStreams(new TrackSpeed())
+
+ // print the result to console.
+ .print(name("console"))
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
new file mode 100644
index 0000000..6332c66
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
+ * accordingly in /resources/META-INF/properties.xml.
+ *
+ * The authentication requires following 4 information.
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TwitterAutoComplete")
+public class TwitterAutoComplete implements StreamingApplication
+{
+ /**
+ * Check whether every character in a string is ASCII encoding.
+ */
+ public static class StringUtils
+ {
+ static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
+
+ public static boolean isAscii(String v)
+ {
+ return encoder.canEncode(v);
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all hashtags from a text form tweet.
+ */
+ private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+ {
+
+ @Override
+ public Iterable<String> f(String input)
+ {
+ List<String> result = new LinkedList<>();
+ Matcher m = Pattern.compile("#\\S+").matcher(input);
+ while (m.find()) {
+ result.add(m.group().substring(1));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Lower latency, but more expensive.
+ */
+ private static class ComputeTopFlat
+ extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final int minPrefix;
+
+ public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.minPrefix = minPrefix;
+ }
+
+ @Override
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+ WindowedStream<CompletionCandidate> input)
+ {
+ TopNByKey topNByKey = new TopNByKey();
+ topNByKey.setN(candidatesPerPrefix);
+ return input
+ .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
+ .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+ {
+ // TODO: Should be removed after Auto-wrapping is supported.
+ return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+ }
+ }, name("TopNByKey"));
+ }
+ }
+
+ /**
+ * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+ * KeyValPairs of the prefix and the CompletionCandidate
+ */
+ private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+ {
+ private final int minPrefix;
+ private final int maxPrefix;
+
+ public AllPrefixes()
+ {
+ this(0, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix)
+ {
+ this(minPrefix, Integer.MAX_VALUE);
+ }
+
+ public AllPrefixes(int minPrefix, int maxPrefix)
+ {
+ this.minPrefix = minPrefix;
+ this.maxPrefix = maxPrefix;
+ }
+
+ @Override
+ public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+ {
+ List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+ String word = input.getValue();
+ for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+ result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A Composite stream transform that takes as input a list of tokens and returns
+ * the most common tokens per prefix.
+ */
+ public static class ComputeTopCompletions
+ extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+ {
+ private final int candidatesPerPrefix;
+ private final boolean recursive;
+
+ protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+ {
+ this.candidatesPerPrefix = candidatesPerPrefix;
+ this.recursive = recursive;
+ }
+
+ public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+ {
+ return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+ {
+
+ ApexStream<CompletionCandidate> candidates = inputStream
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+ }
+ }, name("Hashtag Count"))
+ .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+ {
+ @Override
+ public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+ {
+ return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }, name("KeyValPair to CompletionCandidate"));
+
+ return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
+
+ }
+ }
+
+ /**
+ * FilterFunction to filter out tweets with non-acsii characters.
+ */
+ static class ASCIIFilter implements Function.FilterFunction<String>
+ {
+ @Override
+ public boolean f(String input)
+ {
+ return StringUtils.isAscii(input);
+ }
+ }
+
+ /**
+ * Populate the dag with High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TwitterSampleInput input = new TwitterSampleInput();
+
+ WindowOption windowOption = new WindowOption.GlobalWindow();
+
+ ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
+ .filter(new ASCIIFilter(), name("ACSII Filter"))
+ .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
+
+ ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
+ tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
+ .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
+
+ s.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..bfdb268
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears
+ *
+ * <p>Concepts: the combine transform, which lets you combine the values in a
+ * key-grouped Collection
+ *
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "CombinePerKeyExamples")
+public class CombinePerKeyExamples implements StreamingApplication
+{
+ // Use the shakespeare public BigQuery sample
+ private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
+ // We'll track words >= this word length across all plays in the table.
+ private static final int MIN_WORD_LENGTH = 0;
+
+ /**
+ * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+ * outputs word, play_name.
+ */
+ static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
+ {
+
+ @Override
+ public KeyValPair<String, String> f(SampleBean input)
+ {
+ String playName = input.getCorpus();
+ String word = input.getWord();
+ if (word.length() >= MIN_WORD_LENGTH) {
+ return new KeyValPair<>(word, playName);
+ } else {
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Prepares the output data which is in same bean
+ */
+ static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
+ {
+ @Override
+ public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
+ {
+ return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
+ }
+ }
+
+ /**
+ * A reduce function to concat two strings together.
+ */
+ public static class Concat extends ReduceFn<String>
+ {
+ @Override
+ public String reduce(String input1, String input2)
+ {
+ return input1 + ", " + input2;
+ }
+ }
+
+ /**
+ * Reads the public 'Shakespeare' data, and for each word in the dataset
+ * over a given length, generates a string containing the list of play names
+ * in which that word appears.
+ */
+ private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
+ {
+
+ @Override
+ public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+ {
+ return inputStream
+ // Extract words from the input SampleBeam stream.
+ .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
+
+ // Apply window and trigger option to the streams.
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+ // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
+ .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
+ {
+ return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
+ }
+ }, name("Concat"))
+
+ // Format the output back to a SampleBeam object.
+ .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
+ }
+ }
+
+
+ /**
+ * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
+ */
+ public static class SampleBean
+ {
+
+ public SampleBean()
+ {
+
+ }
+
+ public SampleBean(String word, String corpus)
+ {
+ this.word = word;
+ this.corpus = corpus;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.word + " : " + this.corpus;
+ }
+
+ private String word;
+
+ private String corpus;
+
+ public void setWord(String word)
+ {
+ this.word = word;
+ }
+
+ public String getWord()
+ {
+ return word;
+ }
+
+ public void setCorpus(String corpus)
+ {
+ this.corpus = corpus;
+ }
+
+ public String getCorpus()
+ {
+ return corpus;
+ }
+ }
+
+ /**
+ * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
+ * data.
+ */
+ public static class SampleInput extends BaseOperator implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
+ private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
+ private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
+ private static int i;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ i = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ while (i < 1) {
+ for (String word : words) {
+ for (String corpus : corpuses) {
+ try {
+ Thread.sleep(50);
+ beanOutput.emit(new SampleBean(word, corpus));
+ } catch (Exception e) {
+ // Ignore it
+ }
+ }
+ }
+ i++;
+ }
+
+ }
+ }
+
+ public static class Collector extends BaseOperator
+ {
+ private static List<SampleBean> result;
+ private static boolean done = false;
+
+ public static List<SampleBean> getResult()
+ {
+ return result;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ result = new ArrayList<>();
+ done = false;
+ }
+
+ public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
+ {
+ @Override
+ public void process(SampleBean tuple)
+ {
+ if (tuple.getWord().equals("F")) {
+ done = true;
+ }
+ result.add(tuple);
+ }
+ };
+ }
+
+ /**
+ * Populate dag using High-Level API.
+ * @param dag
+ * @param conf
+ */
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ SampleInput input = new SampleInput();
+ Collector collector = new Collector();
+ StreamFactory.fromInput(input, input.beanOutput, name("input"))
+ .addCompositeStreams(new PlaysForWord())
+ .print(name("console"))
+ .endWith(collector, collector.input, name("Collector"))
+ .populateDag(dag);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
new file mode 100644
index 0000000..4df5fe7
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam DeDupExample.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "DeDupExample")
+public class DeDupExample implements StreamingApplication
+{
+
+ public static class Collector extends BaseOperator
+ {
+ private static Tuple.WindowedTuple<List<String>> result;
+ private static boolean done = false;
+
+ public static Tuple.WindowedTuple<List<String>> getResult()
+ {
+ return result;
+ }
+
+ public static boolean isDone()
+ {
+ return done;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ result = new Tuple.WindowedTuple<>();
+ done = false;
+ }
+
+ public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
+ {
+ @Override
+ public void process(Tuple.WindowedTuple<List<String>> tuple)
+ {
+ result = tuple;
+ if (result.getValue().contains("bye")) {
+ done = true;
+ }
+ }
+ };
+ }
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Collector collector = new Collector();
+
+ // Create a stream that reads from files in a local folder and output lines one by one to downstream.
+ ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+
+ // Extract all the words from the input line of text.
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ }, name("ExtractWords"))
+
+ // Change the words to lower case, also shutdown the app when the word "bye" is detected.
+ .map(new Function.MapFunction<String, String>()
+ {
+ @Override
+ public String f(String input)
+ {
+ return input.toLowerCase();
+ }
+ }, name("ToLowerCase"));
+
+ // Apply window and trigger option.
+ stream.window(new WindowOption.GlobalWindow(),
+ new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
+
+ // Remove the duplicate words and print out the result.
+ .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates"))
+ .print(name("console"))
+ .endWith(collector, collector.input)
+ .populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
new file mode 100644
index 0000000..834964c
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * Tuple class for JDBC input of {@link MaxPerKeyExamples}.
+ *
+ * @since 3.5.0
+ */
+public class InputPojo extends Object
+{
+ private int month;
+ private int day;
+ private int year;
+ private double meanTemp;
+
+ @Override
+ public String toString()
+ {
+ return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
+ }
+
+ public void setMonth(int month)
+ {
+ this.month = month;
+ }
+
+ public int getMonth()
+ {
+ return this.month;
+ }
+
+ public void setDay(int day)
+ {
+ this.day = day;
+ }
+
+ public int getDay()
+ {
+ return day;
+ }
+
+ public void setYear(int year)
+ {
+ this.year = year;
+ }
+
+ public int getYear()
+ {
+ return year;
+ }
+
+ public void setMeanTemp(double meanTemp)
+ {
+ this.meanTemp = meanTemp;
+ }
+
+ public double getMeanTemp()
+ {
+ return meanTemp;
+ }
+}