You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:17 UTC

[12/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
new file mode 100644
index 0000000..2db59b6
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ * This application is identical to TwitterAutoComplete, except it's
+ * reading from a file. This application is mainly for local testing
+ * purpose.
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "AutoComplete")
+public class AutoComplete implements StreamingApplication
+{
+
+  /**
+   * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
+   * half of a second.
+   */
+  public static class TweetsInput extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    private boolean done;
+
+    private transient BufferedReader reader;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      done = false;
+      initReader();
+    }
+
+    private void initReader()
+    {
+      try {
+        InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
+        reader = new BufferedReader(new InputStreamReader(resourceStream));
+      } catch (Exception ex) {
+        throw Throwables.propagate(ex);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      IOUtils.closeQuietly(reader);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      if (!done) {
+        try {
+          String line = reader.readLine();
+          if (line == null) {
+            done = true;
+            reader.close();
+          } else {
+            this.output.emit(line);
+          }
+          Thread.sleep(50);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        } catch (InterruptedException e) {
+          // Ignore it.
+        }
+      }
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+    private static boolean done = false;
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      super.setup(context);
+      done = false;
+    }
+
+    public static Map<String, List<CompletionCandidate>> getResult()
+    {
+      return result;
+    }
+
+    public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
+      {
+        if (tuple.getValue().getKey().equals("yarn")) {
+          done = true;
+        }
+        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+      }
+    };
+  }
+
+  /**
+   * FlapMap Function to extract all hashtags from a text form tweet.
+   */
+  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+  {
+
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new LinkedList<>();
+      Matcher m = Pattern.compile("#\\S+").matcher(input);
+      while (m.find()) {
+        result.add(m.group().substring(1));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+        WindowedStream<CompletionCandidate> input)
+    {
+      return input
+        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
+        .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
+          CompletionCandidate>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+          {
+            // TODO: Should be removed after Auto-wrapping is supported.
+            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+          }
+        });
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+   * KeyValPairs of the prefix and the CompletionCandidate
+   */
+  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+  {
+    private final int minPrefix;
+    private final int maxPrefix;
+
+    public AllPrefixes()
+    {
+      this(0, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix)
+    {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix, int maxPrefix)
+    {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+
+    @Override
+    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+    {
+      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+      String word = input.getValue();
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+
+        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * A Composite stream transform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+    {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+    {
+      ApexStream<CompletionCandidate> candidates = inputStream
+          .countByKey(new Function.ToKeyValue<String, String, Long>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, Long>> f(String input)
+            {
+              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+            }
+          }, name("countByKey"))
+          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          {
+            @Override
+            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+            {
+              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+            }
+          }, name("ToCompletionCandidate"));
+
+      return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
+
+    }
+  }
+
+  /**
+   * Populate the dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TweetsInput input = new TweetsInput();
+    Collector collector = new Collector();
+
+    WindowOption windowOption = new WindowOption.GlobalWindow();
+
+    ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
+        .flatMap(new ExtractHashtags());
+
+    tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
+        .endWith(collector, collector.input, name("collector"))
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
new file mode 100644
index 0000000..bd5c511
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Class used to store tag-count pairs in Auto Complete Example.
+ *
+ * @since 3.5.0
+ */
+public class CompletionCandidate implements Comparable<CompletionCandidate>
+{
+  private long count;
+  private String value;
+
+  public CompletionCandidate(String value, long count)
+  {
+    this.value = value;
+    this.count = count;
+  }
+
+  public long getCount()
+  {
+    return count;
+  }
+
+  public String getValue()
+  {
+    return value;
+  }
+
+  // Empty constructor required for Kryo.
+  public CompletionCandidate()
+  {
+
+  }
+
+  @Override
+  public int compareTo(CompletionCandidate o)
+  {
+    if (this.count < o.count) {
+      return -1;
+    } else if (this.count == o.count) {
+      return this.value.compareTo(o.value);
+    } else {
+      return 1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object other)
+  {
+    if (other instanceof CompletionCandidate) {
+      CompletionCandidate that = (CompletionCandidate)other;
+      return this.count == that.count && this.value.equals(that.value);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Long.valueOf(count).hashCode() ^ value.hashCode();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CompletionCandidate[" + value + ", " + count + "]";
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
new file mode 100644
index 0000000..ee15d90
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Tuple Class for JdbcOutput of StreamingWordExtract.
+ *
+ * @since 3.5.0
+ */
+public class PojoEvent extends Object
+{
+  private String stringValue;
+
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [stringValue=" + getStringValue() + "]";
+  }
+
+  public void setStringValue(String newString)
+  {
+    this.stringValue = newString;
+  }
+
+  public String getStringValue()
+  {
+    return this.stringValue;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..07f01d0
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static java.sql.Types.VARCHAR;
+
+/**
+ * Beam StreamingWordExtract Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "StreamingWordExtract")
+public class StreamingWordExtract implements StreamingApplication
+{
+  private static int wordCount = 0; // A counter to count number of words have been extracted.
+  private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
+
+  public int getWordCount()
+  {
+    return wordCount;
+  }
+
+  public int getEntriesMapped()
+  {
+    return entriesMapped;
+  }
+
+  /**
+   * A MapFunction that tokenizes lines of text into individual words.
+   */
+  public static class ExtractWords implements Function.FlatMapFunction<String, String>
+  {
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
+      wordCount += result.size();
+      return result;
+    }
+  }
+
+
+  /**
+   * A MapFunction that uppercases a word.
+   */
+  public static class Uppercase implements Function.MapFunction<String, String>
+  {
+    @Override
+    public String f(String input)
+    {
+      return input.toUpperCase();
+    }
+  }
+
+
+  /**
+   * A filter function to filter out empty strings.
+   */
+  public static class EmptyStringFilter implements Function.FilterFunction<String>
+  {
+    @Override
+    public boolean f(String input)
+    {
+      return !input.isEmpty();
+    }
+  }
+
+
+  /**
+   * A map function to map the result string to a pojo entry.
+   */
+  public static class PojoMapper implements Function.MapFunction<String, Object>
+  {
+
+    @Override
+    public Object f(String input)
+    {
+      PojoEvent pojo = new PojoEvent();
+      pojo.setStringValue(input);
+      entriesMapped++;
+      return pojo;
+    }
+  }
+
+  /**
+   * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
+   */
+  private static List<JdbcFieldInfo> addFieldInfos()
+  {
+    List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
+    fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
+    return fieldInfos;
+  }
+
+  /**
+   * Populate dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+    jdbcOutput.setFieldInfos(addFieldInfos());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    jdbcOutput.setStore(outputStore);
+    jdbcOutput.setTablename("TestTable");
+
+    // Create a stream reading from a folder.
+    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
+
+    // Extract all the words from the input line of text.
+    stream.flatMap(new ExtractWords())
+
+        // Filter out the empty strings.
+        .filter(new EmptyStringFilter())
+
+        // Change every word to uppercase.
+        .map(new Uppercase())
+
+        // Map the resulted word to a Pojo entry.
+        .map(new PojoMapper())
+
+        // Output the entries to JdbcOutput and insert them into a table.
+        .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
+
+    stream.populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
new file mode 100644
index 0000000..937254c
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Specialized TopNByKey accumulation for AutoComplete Example.
+ *
+ * @since 3.5.0
+ */
+public class TopNByKey implements
+    Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>>
+{
+  int n = 10;
+
+  Comparator comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public Map<String, Long> defaultAccumulatedValue()
+  {
+    return new HashMap<>();
+  }
+
+  /**
+   * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a
+   * new entry otherwise.
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  @Override
+  public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input)
+  {
+    accumulatedValue.put(input.getValue(), input.getCount());
+    return accumulatedValue;
+  }
+
+  /**
+   * Merge two Maps together. For every key, keep the larger value in the resulted Map.
+   * @param accumulatedValue1
+   * @param accumulatedValue2
+   * @return
+   */
+  @Override
+  public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2)
+  {
+    for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) {
+      if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) {
+        continue;
+      }
+      accumulatedValue1.put(entry.getKey(), entry.getValue());
+    }
+    return accumulatedValue1;
+  }
+
+  /**
+   * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing
+   * those entries.
+   * @param accumulatedValue
+   * @return
+   */
+  @Override
+  public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue)
+  {
+    LinkedList<CompletionCandidate> result = new LinkedList<>();
+    for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) {
+      int k = 0;
+      for (CompletionCandidate inMemory : result) {
+        if (entry.getValue() > inMemory.getCount()) {
+          break;
+        }
+        k++;
+      }
+      result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue()));
+      if (result.size() > n) {
+        result.remove(result.get(result.size() - 1));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value)
+  {
+    return new LinkedList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..68ec733
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TopWikipediaSessions Example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TopWikipediaSessions")
+public class TopWikipediaSessions implements StreamingApplication
+{
+  /**
+   * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
+   */
+  public static class SessionGen extends BaseOperator implements InputOperator
+  {
+    private String[] names = new String[]{"user1", "user2", "user3", "user4"};
+    public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
+
+    private static final Duration RAND_RANGE = Duration.standardDays(365);
+    private Long minTimestamp;
+    private long sleepTime;
+    private static int tupleCount = 0;
+
+    public static int getTupleCount()
+    {
+      return tupleCount;
+    }
+
+    private String randomName(String[] names)
+    {
+      int index = new Random().nextInt(names.length);
+      return names[index];
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      tupleCount = 0;
+      minTimestamp = System.currentTimeMillis();
+      sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+      long randomTimestamp = minTimestamp + randMillis;
+      output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
+      tupleCount++;
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        // Ignore it.
+      }
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    private final int resultSize = 5;
+    private static List<List<TempWrapper>> result = new ArrayList<>();
+
+    public static List<List<TempWrapper>> getResult()
+    {
+      return result;
+    }
+
+    public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
+      {
+        if (result.size() == resultSize) {
+          result.remove(0);
+        }
+        result.add(tuple.getValue());
+      }
+    };
+  }
+
+
+  /**
+   * Convert the upstream (user, time) combination to a timestamped tuple of user.
+   */
+  static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
+  {
+    @Override
+    public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
+    {
+      long timestamp = input.getValue();
+      String userName = input.getKey();
+
+      // Sets the implicit timestamp field to be used in windowing.
+      return new Tuple.TimestampedTuple<>(timestamp, userName);
+
+    }
+  }
+
+  /**
+   * Computes the number of edits in each user session.  A session is defined as
+   * a string of edits where each is separated from the next by less than an hour.
+   */
+  static class ComputeSessions
+      extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
+  {
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
+    {
+      return inputStream
+
+        // Chuck the stream into session windows.
+        .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+        // Count the number of edits for a user within one session.
+        .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+        {
+          @Override
+          public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+          {
+            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
+          }
+        }, name("ComputeSessions"));
+    }
+  }
+
+  /**
+   * A comparator class used for comparing two TempWrapper objects.
+   */
+  public static class Comp implements Comparator<TempWrapper>
+  {
+    @Override
+    public int compare(TempWrapper o1, TempWrapper o2)
+    {
+      return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
+    }
+  }
+
+  /**
+   * A function to extract timestamp from a TempWrapper object.
+   */
+  // TODO: Need to revisit and change back to using TimestampedTuple.
+  public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
+  {
+    @Override
+    public Long apply(@Nullable TempWrapper input)
+    {
+      return input.getTimestamp();
+    }
+  }
+
+  /**
+   * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
+   * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
+   * remove this class.
+   */
+  public static class TempWrapper
+  {
+    private KeyValPair<String, Long> value;
+    private Long timestamp;
+
+    public TempWrapper()
+    {
+
+    }
+
+    public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
+    {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.value + "  -  " + this.timestamp;
+    }
+
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+
+    public KeyValPair<String, Long> getValue()
+    {
+      return value;
+    }
+
+    public void setValue(KeyValPair<String, Long> value)
+    {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
+   */
+  private static class TopPerMonth
+      extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+  {
+
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
+    {
+      TopN<TempWrapper> topN = new TopN<>();
+      topN.setN(10);
+      topN.setComparator(new Comp());
+
+      return inputStream
+
+        // Map the input WindowedTuple to a TempWrapper object.
+        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
+        {
+          @Override
+          public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+          {
+            Window window = input.getWindows().iterator().next();
+            return new TempWrapper(input.getValue(), window.getBeginTimestamp());
+          }
+        }, name("TempWrapper"))
+
+        // Apply window and trigger option again, this time chuck the stream into fixed time windows.
+        .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
+
+        // Compute the top 10 user-sessions with most number of edits.
+        .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
+    }
+  }
+
+  /**
+   * A map function that combine the user and his/her edit session together to a string and use that string as a key
+   * with number of edits in that session as value to create a new key value pair to send to downstream.
+   */
+  static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
+  {
+    @Override
+    public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+    {
+      Window window = input.getWindows().iterator().next();
+      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
+        input.getValue().getKey()  + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
+        input.getValue().getValue()));
+    }
+  }
+
+  /**
+   * A flatmap function that turns the result into readable format.
+   */
+  static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
+  {
+    @Override
+    public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
+    {
+      ArrayList<String> result = new ArrayList<>();
+      for (TempWrapper item : input.getValue()) {
+        String session = item.getValue().getKey();
+        long count = item.getValue().getValue();
+        Window window = input.getWindows().iterator().next();
+        result.add(session + " + " + count + " : " + window.getBeginTimestamp());
+      }
+      return result;
+    }
+  }
+
+  /**
+   * A composite transform that compute the top wikipedia sessions.
+   */
+  public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+  {
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
+    {
+      return inputStream
+        .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
+        .addCompositeStreams(new ComputeSessions())
+        .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
+        .addCompositeStreams(new TopPerMonth());
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    SessionGen sg = new SessionGen();
+    Collector collector = new Collector();
+    StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
+      .addCompositeStreams(new ComputeTopSessions())
+      .print(name("console"))
+      .endWith(collector, collector.input, name("collector")).populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
new file mode 100644
index 0000000..e6a53d6
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.Group;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TrafficRoutes example.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TrafficRoutes")
+public class TrafficRoutes implements StreamingApplication
+{
+  static Map<String, String> sdStations = buildStationInfo();
+  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
+  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
+
+  /**
+   * This class holds information about a station reading's average speed.
+   */
+  public static class StationSpeed implements Comparable<StationSpeed>
+  {
+    @Nullable
+    String stationId;
+    @Nullable
+    Double avgSpeed;
+    @Nullable
+    Long timestamp;
+
+    public StationSpeed() {}
+
+    public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
+    {
+      this.stationId = stationId;
+      this.avgSpeed = avgSpeed;
+      this.timestamp = timestamp;
+    }
+
+    public void setAvgSpeed(@Nullable Double avgSpeed)
+    {
+      this.avgSpeed = avgSpeed;
+    }
+
+    public void setStationId(@Nullable String stationId)
+    {
+      this.stationId = stationId;
+    }
+
+    public void setTimestamp(@Nullable Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+
+    @Nullable
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public String getStationId()
+    {
+      return this.stationId;
+    }
+
+    public Double getAvgSpeed()
+    {
+      return this.avgSpeed;
+    }
+
+    @Override
+    public int compareTo(StationSpeed other)
+    {
+      return Long.compare(this.timestamp, other.timestamp);
+    }
+  }
+
+  /**
+   * This class holds information about a route's speed/slowdown.
+   */
+  static class RouteInfo
+  {
+    @Nullable
+    String route;
+    @Nullable
+    Double avgSpeed;
+    @Nullable
+    Boolean slowdownEvent;
+
+    public RouteInfo()
+    {
+
+    }
+
+    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
+    {
+      this.route = route;
+      this.avgSpeed = avgSpeed;
+      this.slowdownEvent = slowdownEvent;
+    }
+
+    public String getRoute()
+    {
+      return this.route;
+    }
+
+    public Double getAvgSpeed()
+    {
+      return this.avgSpeed;
+    }
+
+    public Boolean getSlowdownEvent()
+    {
+      return this.slowdownEvent;
+    }
+  }
+
+  /**
+   * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
+   * with the extracted timestamp.
+   */
+  static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+  {
+
+    @Override
+    public Tuple.TimestampedTuple<String> f(String input)
+    {
+      String[] items = input.split(",");
+      String timestamp = tryParseTimestamp(items);
+
+      return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
+    }
+  }
+
+  /**
+   * Filter out readings for the stations along predefined 'routes', and output
+   * (station, speed info) keyed on route.
+   */
+  static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
+  {
+
+    @Override
+    public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
+    {
+
+      ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
+      String[] items = input.getValue().split(",");
+      String stationType = tryParseStationType(items);
+      // For this analysis, use only 'main line' station types
+      if (stationType != null && stationType.equals("ML")) {
+        Double avgSpeed = tryParseAvgSpeed(items);
+        String stationId = tryParseStationId(items);
+        // For this simple example, filter out everything but some hardwired routes.
+        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+          StationSpeed stationSpeed =
+              new StationSpeed(stationId, avgSpeed, input.getTimestamp());
+          // The tuple key is the 'route' name stored in the 'sdStations' hash.
+          KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
+          result.add(outputValue);
+        }
+      }
+      return result;
+    }
+  }
+
+  /**
+   * For a given route, track average speed for the window. Calculate whether
+   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+   */
+  static class GatherStats
+      implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
+  {
+    @Override
+    public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
+    {
+      ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
+      String route = input.getValue().getKey();
+      double speedSum = 0.0;
+      int speedCount = 0;
+      int speedups = 0;
+      int slowdowns = 0;
+      List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
+      // StationSpeeds sort by embedded timestamp.
+      Collections.sort(infoList);
+      Map<String, Double> prevSpeeds = new HashMap<>();
+      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+      for (StationSpeed item : infoList) {
+        Double speed = item.getAvgSpeed();
+        if (speed != null) {
+          speedSum += speed;
+          speedCount++;
+          Double lastSpeed = prevSpeeds.get(item.getStationId());
+          if (lastSpeed != null) {
+            if (lastSpeed < speed) {
+              speedups += 1;
+            } else {
+              slowdowns += 1;
+            }
+          }
+          prevSpeeds.put(item.getStationId(), speed);
+        }
+      }
+      if (speedCount == 0) {
+        // No average to compute.
+        return result;
+      }
+      double speedAvg = speedSum / speedCount;
+      boolean slowdownEvent = slowdowns >= 2 * speedups;
+      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+      result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
+      return result;
+    }
+  }
+
+  /**
+   * Output Pojo class for outputting result to JDBC.
+   */
+  static class OutputPojo
+  {
+    private Double avgSpeed;
+    private Boolean slowdownEvent;
+    private String key;
+    private Long timestamp;
+
+    public OutputPojo()
+    {
+    }
+
+    public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
+    {
+      this.avgSpeed = avgSpeed;
+      this.slowdownEvent = slowdownEvent;
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString()
+    {
+      return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
+    }
+
+    public void setTimestamp(Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setAvgSpeed(Double avgSpeed)
+    {
+      this.avgSpeed = avgSpeed;
+    }
+
+    public Double getAvgSpeed()
+    {
+      return avgSpeed;
+    }
+
+    public void setKey(String key)
+    {
+      this.key = key;
+    }
+
+    public String getKey()
+    {
+      return key;
+    }
+
+    public void setSlowdownEvent(Boolean slowdownEvent)
+    {
+      this.slowdownEvent = slowdownEvent;
+    }
+
+    public Boolean getSlowdownEvent()
+    {
+      return slowdownEvent;
+    }
+
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
+
+    public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
+    {
+      return result;
+    }
+
+    public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
+    {
+      @Override
+      public void process(OutputPojo tuple)
+      {
+        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
+      }
+    };
+  }
+
+  /**
+   * Format the results of the slowdown calculations to a OutputPojo.
+   */
+  static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
+  {
+    @Override
+    public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
+    {
+      RouteInfo routeInfo = input.getValue().getValue();
+      OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
+      return row;
+    }
+  }
+
+
+  /**
+   * This composite transformation extracts speed info from traffic station readings.
+   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+   * Lastly, it formats the results for JDBC.
+   */
+  static class TrackSpeed extends
+      CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
+  {
+    @Override
+    public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
+    {
+      // Apply a GroupByKey transform to collect a list of all station
+      // readings for a given route.
+      WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
+          inputStream
+          .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
+            {
+              return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
+            }
+          }, name("GroupByKey"));
+
+      // Analyze 'slowdown' over the route readings.
+      WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
+          .flatMap(new GatherStats(), name("GatherStats"));
+
+      // Format the results for writing to JDBC table.
+      WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
+
+      return results;
+    }
+  }
+
+
+  private static Double tryParseAvgSpeed(String[] inputItems)
+  {
+    try {
+      return Double.parseDouble(tryParseString(inputItems, 3));
+    } catch (NumberFormatException e) {
+      return null;
+    } catch (NullPointerException e) {
+      return null;
+    }
+  }
+
+  private static String tryParseStationType(String[] inputItems)
+  {
+    return tryParseString(inputItems, 2);
+  }
+
+  private static String tryParseStationId(String[] inputItems)
+  {
+    return tryParseString(inputItems, 1);
+  }
+
+  private static String tryParseTimestamp(String[] inputItems)
+  {
+    return tryParseString(inputItems, 0);
+  }
+
+  private static String tryParseString(String[] inputItems, int index)
+  {
+    return inputItems.length >= index ? inputItems[index] : null;
+  }
+
+  /**
+   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+   */
+  private static Map<String, String> buildStationInfo()
+  {
+    Map<String, String> stations = new Hashtable<String, String>();
+    stations.put("1108413", "SDRoute1"); // from freeway 805 S
+    stations.put("1108699", "SDRoute2"); // from freeway 78 E
+    stations.put("1108702", "SDRoute2");
+    return stations;
+  }
+
+  /**
+   * A dummy generator to generate some traffic information.
+   */
+  public static class InfoGen extends BaseOperator implements InputOperator
+  {
+    public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+    private String[] stationTypes = new String[]{"ML", "BL", "GL"};
+    private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
+    private double ave = 55.0;
+    private long timestamp;
+    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+    private static int tupleCount = 0;
+
+    public static int getTupleCount()
+    {
+      return tupleCount;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      tupleCount = 0;
+      timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      for (String stationType : stationTypes) {
+        for (int stationID : stationIDs) {
+          double speed = Math.random() * 20 + ave;
+          long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
+          try {
+            output.emit(time + "," + stationID + "," + stationType + "," + speed);
+            tupleCount++;
+
+            Thread.sleep(50);
+          } catch (Exception e) {
+            // Ignore it
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    InfoGen infoGen = new InfoGen();
+    Collector collector = new Collector();
+
+    // Create a stream from the input operator.
+    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
+
+        // Extract the timestamp from the input and wrap it into a TimestampedTuple.
+        .map(new ExtractTimestamps(), name("ExtractTimestamps"));
+
+    stream
+        // Extract the average speed of a station.
+        .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
+
+        // Apply window and trigger option.
+        .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
+
+        // Apply TrackSpeed composite transformation to compute the route information.
+        .addCompositeStreams(new TrackSpeed())
+
+        // print the result to console.
+        .print(name("console"))
+        .endWith(collector, collector.input, name("Collector"))
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
new file mode 100644
index 0000000..6332c66
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Auto Complete Hashtag Example with real time twitter input. In order to run this application, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
+ * accordingly in /resources/META-INF/properties.xml.
+ *
+ * The authentication requires following 4 information.
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "TwitterAutoComplete")
+public class TwitterAutoComplete implements StreamingApplication
+{
+  /**
+   * Check whether every character in a string is ASCII encoding.
+   */
+  public static class StringUtils
+  {
+    static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
+
+    public static boolean isAscii(String v)
+    {
+      return encoder.canEncode(v);
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all hashtags from a text form tweet.
+   */
+  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+  {
+
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new LinkedList<>();
+      Matcher m = Pattern.compile("#\\S+").matcher(input);
+      while (m.find()) {
+        result.add(m.group().substring(1));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+        WindowedStream<CompletionCandidate> input)
+    {
+      TopNByKey topNByKey = new TopNByKey();
+      topNByKey.setN(candidatesPerPrefix);
+      return input
+        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
+        .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+          {
+            // TODO: Should be removed after Auto-wrapping is supported.
+            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
+          }
+        }, name("TopNByKey"));
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+   * KeyValPairs of the prefix and the CompletionCandidate
+   */
+  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+  {
+    private final int minPrefix;
+    private final int maxPrefix;
+
+    public AllPrefixes()
+    {
+      this(0, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix)
+    {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix, int maxPrefix)
+    {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+
+    @Override
+    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+    {
+      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+      String word = input.getValue();
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * A Composite stream transform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+    {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+    {
+
+      ApexStream<CompletionCandidate> candidates = inputStream
+          .countByKey(new Function.ToKeyValue<String, String, Long>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, Long>> f(String input)
+            {
+              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+            }
+          }, name("Hashtag Count"))
+          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          {
+            @Override
+            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+            {
+              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+            }
+          }, name("KeyValPair to CompletionCandidate"));
+
+      return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
+
+    }
+  }
+
+  /**
+   * FilterFunction to filter out tweets with non-acsii characters.
+   */
+  static class ASCIIFilter implements Function.FilterFunction<String>
+  {
+    @Override
+    public boolean f(String input)
+    {
+      return StringUtils.isAscii(input);
+    }
+  }
+
+  /**
+   * Populate the dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TwitterSampleInput input = new TwitterSampleInput();
+
+    WindowOption windowOption = new WindowOption.GlobalWindow();
+
+    ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
+        .filter(new ASCIIFilter(), name("ACSII Filter"))
+        .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
+
+    ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
+        tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
+        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
+
+    s.populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..bfdb268
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears
+ *
+ * <p>Concepts: the combine transform, which lets you combine the values in a
+ * key-grouped Collection
+ *
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "CombinePerKeyExamples")
+public class CombinePerKeyExamples implements StreamingApplication
+{
+  // Use the shakespeare public BigQuery sample
+  private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
+  // We'll track words >= this word length across all plays in the table.
+  private static final int MIN_WORD_LENGTH = 0;
+
+  /**
+   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+   * outputs word, play_name.
+   */
+  static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
+  {
+
+    @Override
+    public KeyValPair<String, String> f(SampleBean input)
+    {
+      String playName = input.getCorpus();
+      String word = input.getWord();
+      if (word.length() >= MIN_WORD_LENGTH) {
+        return new KeyValPair<>(word, playName);
+      } else {
+        return null;
+      }
+    }
+  }
+
+
+  /**
+   * Prepares the output data which is in same bean
+   */
+  static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
+  {
+    @Override
+    public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
+    {
+      return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
+    }
+  }
+
+  /**
+   * A reduce function to concat two strings together.
+   */
+  public static class Concat extends ReduceFn<String>
+  {
+    @Override
+    public String reduce(String input1, String input2)
+    {
+      return input1 + ", " + input2;
+    }
+  }
+
+  /**
+   * Reads the public 'Shakespeare' data, and for each word in the dataset
+   * over a given length, generates a string containing the list of play names
+   * in which that word appears.
+   */
+  private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
+  {
+
+    @Override
+    public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+    {
+      return inputStream
+          // Extract words from the input SampleBeam stream.
+          .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
+
+          // Apply window and trigger option to the streams.
+          .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+          // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
+          .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
+            {
+              return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
+            }
+          }, name("Concat"))
+
+          // Format the output back to a SampleBeam object.
+          .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
+    }
+  }
+
+
+  /**
+   * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
+   */
+  public static class SampleBean
+  {
+
+    public SampleBean()
+    {
+
+    }
+
+    public SampleBean(String word, String corpus)
+    {
+      this.word = word;
+      this.corpus = corpus;
+    }
+
+    @Override
+    public String toString()
+    {
+      return this.word + " : "  + this.corpus;
+    }
+
+    private String word;
+
+    private String corpus;
+
+    public void setWord(String word)
+    {
+      this.word = word;
+    }
+
+    public String getWord()
+    {
+      return word;
+    }
+
+    public void setCorpus(String corpus)
+    {
+      this.corpus = corpus;
+    }
+
+    public String getCorpus()
+    {
+      return corpus;
+    }
+  }
+
+  /**
+   * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
+   * data.
+   */
+  public static class SampleInput extends BaseOperator implements InputOperator
+  {
+
+    public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
+    private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
+    private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
+    private static int i;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      i = 0;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      while (i < 1) {
+        for (String word : words) {
+          for (String corpus : corpuses) {
+            try {
+              Thread.sleep(50);
+              beanOutput.emit(new SampleBean(word, corpus));
+            } catch (Exception e) {
+              // Ignore it
+            }
+          }
+        }
+        i++;
+      }
+
+    }
+  }
+
+  public static class Collector extends BaseOperator
+  {
+    private static List<SampleBean> result;
+    private static boolean done = false;
+
+    public static List<SampleBean> getResult()
+    {
+      return result;
+    }
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      result = new ArrayList<>();
+      done = false;
+    }
+
+    public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
+    {
+      @Override
+      public void process(SampleBean tuple)
+      {
+        if (tuple.getWord().equals("F")) {
+          done = true;
+        }
+        result.add(tuple);
+      }
+    };
+  }
+
+  /**
+   * Populate dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    SampleInput input = new SampleInput();
+    Collector collector = new Collector();
+    StreamFactory.fromInput(input, input.beanOutput, name("input"))
+      .addCompositeStreams(new PlaysForWord())
+      .print(name("console"))
+      .endWith(collector, collector.input, name("Collector"))
+      .populateDag(dag);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
new file mode 100644
index 0000000..4df5fe7
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam DeDupExample.
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "DeDupExample")
+public class DeDupExample implements StreamingApplication
+{
+
+  public static class Collector extends BaseOperator
+  {
+    private static Tuple.WindowedTuple<List<String>> result;
+    private static boolean done = false;
+
+    public static Tuple.WindowedTuple<List<String>> getResult()
+    {
+      return result;
+    }
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      result = new Tuple.WindowedTuple<>();
+      done = false;
+    }
+
+    public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<List<String>> tuple)
+      {
+        result = tuple;
+        if (result.getValue().contains("bye")) {
+          done = true;
+        }
+      }
+    };
+  }
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    Collector collector = new Collector();
+
+    // Create a stream that reads from files in a local folder and output lines one by one to downstream.
+    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+
+        // Extract all the words from the input line of text.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+          }
+        }, name("ExtractWords"))
+
+        // Change the words to lower case, also shutdown the app when the word "bye" is detected.
+        .map(new Function.MapFunction<String, String>()
+        {
+          @Override
+          public String f(String input)
+          {
+            return input.toLowerCase();
+          }
+        }, name("ToLowerCase"));
+
+    // Apply window and trigger option.
+    stream.window(new WindowOption.GlobalWindow(),
+        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
+
+        // Remove the duplicate words and print out the result.
+        .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates"))
+        .print(name("console"))
+        .endWith(collector, collector.input)
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
new file mode 100644
index 0000000..834964c
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * Tuple class for JDBC input of {@link MaxPerKeyExamples}.
+ *
+ * @since 3.5.0
+ */
+public class InputPojo extends Object
+{
+  private int month;
+  private int day;
+  private int year;
+  private double meanTemp;
+
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
+  }
+
+  public void setMonth(int month)
+  {
+    this.month = month;
+  }
+
+  public int getMonth()
+  {
+    return this.month;
+  }
+
+  public void setDay(int day)
+  {
+    this.day = day;
+  }
+
+  public int getDay()
+  {
+    return day;
+  }
+
+  public void setYear(int year)
+  {
+    this.year = year;
+  }
+
+  public int getYear()
+  {
+    return year;
+  }
+
+  public void setMeanTemp(double meanTemp)
+  {
+    this.meanTemp = meanTemp;
+  }
+
+  public double getMeanTemp()
+  {
+    return meanTemp;
+  }
+}