You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:20 UTC

[4/6] apex-malhar git commit: Added Beam Examples and Implementations of Accumulation.

Added Beam Examples and Implementations of Accumulation.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dcca7752
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dcca7752
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dcca7752

Branch: refs/heads/master
Commit: dcca7752a8ee966d67602a1b7cb8fbacdb8ed59d
Parents: 266b041
Author: Shunxin <lu...@hotmail.com>
Authored: Wed Aug 24 13:12:20 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Aug 25 09:26:03 2016 -0700

----------------------------------------------------------------------
 demos/highlevelapi/pom.xml                      |  98 +++-
 .../malhar/stream/sample/MinimalWordCount.java  | 126 +++++
 .../malhar/stream/sample/WindowedWordCount.java | 277 ++++++++++
 .../stream/sample/complete/AutoComplete.java    | 308 +++++++++++
 .../sample/complete/CompletionCandidate.java    |  87 ++++
 .../stream/sample/complete/PojoEvent.java       |  44 ++
 .../sample/complete/StreamingWordExtract.java   | 160 ++++++
 .../stream/sample/complete/TopNByKey.java       | 118 +++++
 .../sample/complete/TopWikipediaSessions.java   | 340 ++++++++++++
 .../stream/sample/complete/TrafficRoutes.java   | 521 +++++++++++++++++++
 .../sample/complete/TwitterAutoComplete.java    | 251 +++++++++
 .../sample/cookbook/CombinePerKeyExamples.java  | 212 ++++----
 .../stream/sample/cookbook/DeDupExample.java    | 124 +++++
 .../stream/sample/cookbook/InputPojo.java       |  76 +++
 .../sample/cookbook/MaxPerKeyExamples.java      | 203 ++++++++
 .../stream/sample/cookbook/OutputPojo.java      |  54 ++
 .../stream/sample/cookbook/TriggerExample.java  | 137 +++--
 .../src/main/resources/META-INF/properties.xml  | 141 +++++
 .../stream/sample/MinimalWordCountTest.java     |  61 +++
 .../stream/sample/WindowedWordCountTest.java    |  90 ++++
 .../sample/complete/AutoCompleteTest.java       |  66 +++
 .../complete/StreamingWordExtractTest.java      | 144 +++++
 .../complete/TopWikipediaSessionsTest.java      |  73 +++
 .../sample/complete/TrafficRoutesTest.java      |  66 +++
 .../complete/TwitterAutoCompleteTest.java       |  66 +++
 .../cookbook/CombinePerKeyExamplesTest.java     |  56 ++
 .../sample/cookbook/DeDupExampleTest.java       |  59 +++
 .../sample/cookbook/MaxPerKeyExamplesTest.java  | 210 ++++++++
 .../src/test/resources/data/word.txt            |   2 +
 .../src/test/resources/log4j.properties         |  45 ++
 .../src/test/resources/sampletweets.txt         | 207 ++++++++
 .../src/test/resources/wordcount/word.txt       |   8 +
 demos/pom.xml                                   |  13 +-
 .../lib/window/impl/accumulation/Average.java   |  64 +++
 .../lib/window/impl/accumulation/Count.java     |  61 +++
 .../lib/window/impl/accumulation/FoldFn.java    |  65 +++
 .../lib/window/impl/accumulation/Group.java     |  63 +++
 .../lib/window/impl/accumulation/Max.java       |  75 +++
 .../lib/window/impl/accumulation/Min.java       |  76 +++
 .../lib/window/impl/accumulation/ReduceFn.java  |  65 +++
 .../impl/accumulation/RemoveDuplicates.java     |  72 +++
 .../lib/window/impl/accumulation/SumDouble.java |  60 +++
 .../lib/window/impl/accumulation/SumFloat.java  |  60 +++
 .../lib/window/impl/accumulation/SumInt.java    |  60 +++
 .../lib/window/impl/accumulation/SumLong.java   |  60 +++
 .../lib/window/impl/accumulation/TopN.java      | 106 ++++
 .../lib/window/impl/accumulation/TopNByKey.java | 114 ++++
 .../window/impl/accumulation/AverageTest.java   |  41 ++
 .../window/impl/accumulation/FoldFnTest.java    | 129 +++++
 .../lib/window/impl/accumulation/GroupTest.java |  42 ++
 .../lib/window/impl/accumulation/MaxTest.java   |  53 ++
 .../lib/window/impl/accumulation/MinTest.java   |  53 ++
 .../window/impl/accumulation/ReduceFnTest.java  |  50 ++
 .../impl/accumulation/RemoveDuplicatesTest.java |  42 ++
 .../lib/window/impl/accumulation/SumTest.java   |  57 ++
 .../window/impl/accumulation/TopNByKeyTest.java |  75 +++
 .../apex/malhar/stream/api/WindowedStream.java  |   4 +-
 .../stream/api/impl/ApexWindowedStreamImpl.java |  11 +-
 .../stream/api/impl/accumulation/Count.java     |  61 ---
 .../stream/api/impl/accumulation/FoldFn.java    |  65 ---
 .../stream/api/impl/accumulation/ReduceFn.java  |  65 ---
 .../stream/api/impl/accumulation/TopN.java      | 107 ----
 stream/src/test/resources/words/word.txt        |   2 +
 63 files changed, 5820 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml
index c669681..cde0c83 100644
--- a/demos/highlevelapi/pom.xml
+++ b/demos/highlevelapi/pom.xml
@@ -34,21 +34,107 @@
     <version>3.5.0-SNAPSHOT</version>
   </parent>
 
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.9.1</version>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <phase>package</phase>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>target/${project.artifactId}-${project.version}.apa</file>
+                  <type>apa</type>
+                </artifact>
+              </artifacts>
+              <skipAttach>false</skipAttach>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
 
   <dependencies>
     <dependency>
-      <groupId>cglib</groupId>
-      <artifactId>cglib</artifactId>
-      <version>3.2.1</version>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <!-- required by twitter demo -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-core</artifactId>
+      <version>4.0.4</version>
+    </dependency>
+    <dependency>
+      <!-- required by twitter demo -->
+      <groupId>org.twitter4j</groupId>
+      <artifactId>twitter4j-stream</artifactId>
+      <version>4.0.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-stream</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>2.3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>1.4.192</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
       <version>2.9.1</version>
     </dependency>
+    <dependency>
+      <!--This dependency is needed for StreamingWordExtractTest-->
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>commons-compiler</artifactId>
+      <version>2.7.8</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <!--This dependency is needed for StreamingWordExtractTest-->
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+      <version>2.7.8</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
new file mode 100644
index 0000000..671cc72
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam MinimalWordCount Example
+ */
+@ApplicationAnnotation(name = "MinimalWordCount")
+public class MinimalWordCount implements StreamingApplication
+{
+  public static class Collector extends BaseOperator
+  {
+    static Map<String, Long> result;
+    private static boolean done = false;
+  
+    public static boolean isDone()
+    {
+      return done;
+    }
+  
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      done = false;
+      result = new HashMap<>();
+    }
+    
+    public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
+    {
+      @Override
+      public void process(KeyValPair<String, Long> tuple)
+      {
+        if (tuple.getKey().equals("bye")) {
+          done = true;
+        }
+        result.put(tuple.getKey(), tuple.getValue());
+      }
+    };
+  }
+  
+  /**
+   * Populate the dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    Collector collector = new Collector();
+    // Create a stream reading from a file line by line using StreamFactory.
+    StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+        // Use a flatmap transformation to extract words from the incoming stream of lines.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[^a-zA-Z']+"));
+          
+          }
+        }, name("ExtractWords"))
+        // Apply windowing to the stream for counting, in this case, the window option is global window.
+        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        // Count the appearances of every word.
+        .countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
+          }
+        }, name("countByKey"))
+        // Format the counting result to a readable format by unwrapping the tuples.
+        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
+        {
+          @Override
+          public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+          {
+            return input.getValue();
+          }
+        }, name("FormatResults"))
+        // Print the result.
+        .print()
+        // Attach a collector to the stream to collect results.
+        .endWith(collector, collector.input, name("Collector"))
+        // populate the dag using the stream.
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
new file mode 100644
index 0000000..6a6777e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.joda.time.Duration;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam WindowedWordCount Example.
+ */
+@ApplicationAnnotation(name = "WindowedWordCount")
+public class WindowedWordCount implements StreamingApplication
+{
+  static final int WINDOW_SIZE = 1;  // Default window duration in minutes
+  
+  /**
+   * A input operator that reads from and output a file line by line to downstream with a time gap between
+   * every two lines.
+   */
+  public static class TextInput extends BaseOperator implements InputOperator
+  {
+    private static boolean done = false;
+    
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    
+    private transient BufferedReader reader;
+  
+    public static boolean isDone()
+    {
+      return done;
+    }
+  
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      done = false;
+      initReader();
+    }
+    
+    private void initReader()
+    {
+      try {
+        InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
+        reader = new BufferedReader(new InputStreamReader(resourceStream));
+      } catch (Exception ex) {
+        throw Throwables.propagate(ex);
+      }
+    }
+    
+    @Override
+    public void teardown()
+    {
+      IOUtils.closeQuietly(reader);
+    }
+    
+    @Override
+    public void emitTuples()
+    {
+      try {
+        String line = reader.readLine();
+        if (line == null) {
+          done = true;
+          reader.close();
+          Thread.sleep(1000);
+        } else {
+          this.output.emit(line);
+        }
+        Thread.sleep(50);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      } catch (InterruptedException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+  
+    public static Map<KeyValPair<Long, String>, Long> getResult()
+    {
+      return result;
+    }
+  
+    public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
+    {
+      @Override
+      public void process(PojoEvent tuple)
+      {
+        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
+      }
+    };
+  }
+  
+  /**
+   * A Pojo Tuple class used for outputting result to JDBC.
+   */
+  public static class PojoEvent
+  {
+    private String word;
+    private long count;
+    private long timestamp;
+  
+    @Override
+    public String toString()
+    {
+      return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
+    }
+  
+    public String getWord()
+    {
+      return word;
+    }
+  
+    public void setWord(String word)
+    {
+      this.word = word;
+    }
+  
+    public long getCount()
+    {
+      return count;
+    }
+  
+    public void setCount(long count)
+    {
+      this.count = count;
+    }
+  
+    public long getTimestamp()
+    {
+      return timestamp;
+    }
+  
+    public void setTimestamp(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  }
+  
+  /**
+   * A map function that wrap the input string with a random generated timestamp.
+   */
+  public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+  {
+    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+    private final Long minTimestamp;
+    
+    AddTimestampFn()
+    {
+      this.minTimestamp = System.currentTimeMillis();
+    }
+    
+    @Override
+    public Tuple.TimestampedTuple<String> f(String input)
+    {
+      // Generate a timestamp that falls somewhere in the past two hours.
+      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+      long randomTimestamp = minTimestamp + randMillis;
+
+      return new Tuple.TimestampedTuple<>(randomTimestamp, input);
+    }
+  }
+  
+  /** A MapFunction that converts a Word and Count into a PojoEvent. */
+  public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
+  {
+    @Override
+    public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+    {
+      PojoEvent row = new PojoEvent();
+      row.setTimestamp(input.getTimestamp());
+      row.setCount(input.getValue().getValue());
+      row.setWord(input.getValue().getKey());
+      return row;
+    }
+  }
+  
+  /**
+   * Populate dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TextInput input = new TextInput();
+    Collector collector = new Collector();
+    
+    // Create stream from the TextInput operator.
+    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
+      
+        // Extract all the words from the input line of text.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+          }
+        }, name("ExtractWords"))
+      
+        // Wrap the word with a randomly generated timestamp.
+        .map(new AddTimestampFn(), name("AddTimestampFn"));
+    
+   
+    // apply window and trigger option.
+    // TODO: change trigger option to atWaterMark when available.
+    WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
+        .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
+        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
+    
+    
+    WindowedStream<PojoEvent> wordCounts =
+        // Perform a countByKey transformation to count the appearance of each word in every time window.
+        windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+          {
+            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
+              new KeyValPair<String, Long>(input.getValue(), 1L));
+          }
+        }, name("count words"))
+          
+        // Format the output and print out the result.
+        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print();
+    
+    wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
new file mode 100644
index 0000000..29c8cf9
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * An example that computes the most popular hash tags
+ * for every prefix, which can be used for auto-completion.
+ * This application is identical to TwitterAutoComplete, except it's
+ * reading from a file. This application is mainly for local testing
+ * purpose.
+ *
+ * <p>This will update the datastore every 10 seconds based on the last
+ * 30 minutes of data received.
+ */
+@ApplicationAnnotation(name = "AutoComplete")
+public class AutoComplete implements StreamingApplication
+{
+
+  /**
+   * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
+   * half of a second.
+   */
+  public static class TweetsInput extends BaseOperator implements InputOperator
+  {
+    private static boolean done = false;
+    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+    private transient BufferedReader reader;
+  
+    public static boolean isDone()
+    {
+      return done;
+    }
+  
+    @Override
+    public void setup(OperatorContext context)
+    {
+      done = false;
+      initReader();
+    }
+
+    private void initReader()
+    {
+      try {
+        InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
+        reader = new BufferedReader(new InputStreamReader(resourceStream));
+      } catch (Exception ex) {
+        throw Throwables.propagate(ex);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      IOUtils.closeQuietly(reader);
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      try {
+        String line = reader.readLine();
+        if (line == null) {
+          done = true;
+          reader.close();
+          Thread.sleep(1000);
+        } else {
+          this.output.emit(line);
+        }
+        Thread.sleep(50);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      } catch (InterruptedException e) {
+        // Ignore it.
+      }
+    }
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+  
+    public static Map<String, List<CompletionCandidate>> getResult()
+    {
+      return result;
+    }
+  
+    public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
+      {
+        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
+      }
+    };
+  }
+
+  /**
+   * FlapMap Function to extract all hashtags from a text form tweet.
+   */
+  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+  {
+
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new LinkedList<>();
+      Matcher m = Pattern.compile("#\\S+").matcher(input);
+      while (m.find()) {
+        result.add(m.group().substring(1));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+        WindowedStream<CompletionCandidate> input)
+    {
+      return input
+        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
+        .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
+          CompletionCandidate>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+          {
+            // TODO: Should be removed after Auto-wrapping is supported. 
+            return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+          }
+        });
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+   * KeyValPairs of the prefix and the CompletionCandidate
+   */
+  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+  {
+    private final int minPrefix;
+    private final int maxPrefix;
+
+    public AllPrefixes()
+    {
+      this(0, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix)
+    {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix, int maxPrefix)
+    {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+
+    @Override
+    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+    {
+      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+      String word = input.getValue();
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+
+        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * A Composite stream transform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+    {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+    {
+      ApexStream<CompletionCandidate> candidates = inputStream
+          .countByKey(new Function.ToKeyValue<String, String, Long>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, Long>> f(String input)
+            {
+              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+            }
+          }, name("countByKey")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          {
+            @Override
+            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+            {
+              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+            }
+          }, name("ToCompletionCandidate"));
+
+      return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
+
+    }
+  }
+
+  /**
+   * Populate the dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TweetsInput input = new TweetsInput();
+    Collector collector = new Collector();
+
+    WindowOption windowOption = new WindowOption.GlobalWindow();
+
+    ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
+        .flatMap(new ExtractHashtags());
+    
+    tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector"))
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
new file mode 100644
index 0000000..8a7113e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Class used to store tag-count pairs in Auto Complete Demo.
+ */
+public class CompletionCandidate implements Comparable<CompletionCandidate>
+{
+  private long count;
+  private String value;
+
+  public CompletionCandidate(String value, long count)
+  {
+    this.value = value;
+    this.count = count;
+  }
+
+  public long getCount()
+  {
+    return count;
+  }
+
+  public String getValue()
+  {
+    return value;
+  }
+
+  // Empty constructor required for Kryo.
+  public CompletionCandidate()
+  {
+    
+  }
+
+  @Override
+  public int compareTo(CompletionCandidate o)
+  {
+    if (this.count < o.count) {
+      return -1;
+    } else if (this.count == o.count) {
+      return this.value.compareTo(o.value);
+    } else {
+      return 1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object other)
+  {
+    if (other instanceof CompletionCandidate) {
+      CompletionCandidate that = (CompletionCandidate)other;
+      return this.count == that.count && this.value.equals(that.value);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Long.valueOf(count).hashCode() ^ value.hashCode();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "CompletionCandidate[" + value + ", " + count + "]";
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
new file mode 100644
index 0000000..2a4c003
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+/**
+ * Tuple Class for JdbcOutput of StreamingWordExtract.
+ */
+public class PojoEvent extends Object
+{
+  private String stringValue;
+  
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [stringValue=" + getStringValue() + "]";
+  }
+  
+  public void setStringValue(String newString)
+  {
+    this.stringValue = newString;
+  }
+  
+  public String getStringValue()
+  {
+    return this.stringValue;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
new file mode 100644
index 0000000..2ffdc82
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static java.sql.Types.VARCHAR;
+
+/**
+ * Beam StreamingWordExtract Example.
+ */
+@ApplicationAnnotation(name = "StreamingWordExtract")
+public class StreamingWordExtract implements StreamingApplication
+{
+  private static int wordCount = 0; // A counter to count number of words have been extracted.
+  private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
+  
+  public int getWordCount()
+  {
+    return wordCount;
+  }
+  
+  public int getEntriesMapped()
+  {
+    return entriesMapped;
+  }
+  
+  /**
+   * A MapFunction that tokenizes lines of text into individual words.
+   */
+  public static class ExtractWords implements Function.FlatMapFunction<String, String>
+  {
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
+      wordCount += result.size();
+      return result;
+    }
+  }
+  
+  
+  /**
+   * A MapFunction that uppercases a word.
+   */
+  public static class Uppercase implements Function.MapFunction<String, String>
+  {
+    @Override
+    public String f(String input)
+    {
+      return input.toUpperCase();
+    }
+  }
+  
+  
+  /**
+   * A filter function to filter out empty strings.
+   */
+  public static class EmptyStringFilter implements Function.FilterFunction<String>
+  {
+    @Override
+    public boolean f(String input)
+    {
+      return !input.isEmpty();
+    }
+  }
+  
+  
+  /**
+   * A map function to map the result string to a pojo entry.
+   */
+  public static class PojoMapper implements Function.MapFunction<String, Object>
+  {
+  
+    @Override
+    public Object f(String input)
+    {
+      PojoEvent pojo = new PojoEvent();
+      pojo.setStringValue(input);
+      entriesMapped++;
+      return pojo;
+    }
+  }
+  
+  /**
+   * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
+   */
+  private static List<JdbcFieldInfo> addFieldInfos()
+  {
+    List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
+    fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
+    return fieldInfos;
+  }
+  
+  /**
+   * Populate dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+    jdbcOutput.setFieldInfos(addFieldInfos());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    jdbcOutput.setStore(outputStore);
+    jdbcOutput.setTablename("TestTable");
+    
+    // Create a stream reading from a folder.
+    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
+
+    // Extract all the words from the input line of text.
+    stream.flatMap(new ExtractWords())
+      
+        // Filter out the empty strings.
+        .filter(new EmptyStringFilter())
+      
+        // Change every word to uppercase.
+        .map(new Uppercase())
+      
+        // Map the resulted word to a Pojo entry.
+        .map(new PojoMapper())
+      
+        // Output the entries to JdbcOutput and insert them into a table.
+        .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
+    
+    stream.populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
new file mode 100644
index 0000000..a9e7744
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Specialized TopNByKey accumulation for AutoComplete Demo.
+ */
+public class TopNByKey implements
+    Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>>
+{
+  int n = 10;
+
+  Comparator comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public Map<String, Long> defaultAccumulatedValue()
+  {
+    return new HashMap<>();
+  }
+
+  /**
+   * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a
+   * new entry otherwise.
+   * @param accumulatedValue
+   * @param input
+   * @return
+   */
+  @Override
+  public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input)
+  {
+    accumulatedValue.put(input.getValue(), input.getCount());
+    return accumulatedValue;
+  }
+
+  /**
+   * Merge two Maps together. For every key, keep the larger value in the resulted Map.
+   * @param accumulatedValue1
+   * @param accumulatedValue2
+   * @return
+   */
+  @Override
+  public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2)
+  {
+    for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) {
+      if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) {
+        continue;
+      }
+      accumulatedValue1.put(entry.getKey(), entry.getValue());
+    }
+    return accumulatedValue1;
+  }
+
+  /**
+   * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing
+   * those entries.
+   * @param accumulatedValue
+   * @return
+   */
+  @Override
+  public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue)
+  {
+    LinkedList<CompletionCandidate> result = new LinkedList<>();
+    for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) {
+      int k = 0;
+      for (CompletionCandidate inMemory : result) {
+        if (entry.getValue() > inMemory.getCount()) {
+          break;
+        }
+        k++;
+      }
+      result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue()));
+      if (result.size() > n) {
+        result.remove(result.get(result.size() - 1));
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value)
+  {
+    return new LinkedList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
new file mode 100644
index 0000000..de4e590
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TopWikipediaSessions Example.
+ */
+@ApplicationAnnotation(name = "TopWikipediaSessions")
+public class TopWikipediaSessions implements StreamingApplication
+{
+  /**
+   * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
+   */
+  public static class SessionGen extends BaseOperator implements InputOperator
+  {
+    private String[] names = new String[]{"user1", "user2", "user3", "user4"};
+    public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
+  
+    private static final Duration RAND_RANGE = Duration.standardDays(365);
+    private Long minTimestamp;
+    private long sleepTime;
+    private static int tupleCount = 0;
+  
+    public static int getTupleCount()
+    {
+      return tupleCount;
+    }
+  
+    private String randomName(String[] names)
+    {
+      int index = new Random().nextInt(names.length);
+      return names[index];
+    }
+  
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      tupleCount = 0;
+      minTimestamp = System.currentTimeMillis();
+      sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    }
+  
+    @Override
+    public void emitTuples()
+    {
+      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
+      long randomTimestamp = minTimestamp + randMillis;
+      output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
+      tupleCount++;
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        // Ignore it.
+      }
+    }
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private final int resultSize = 5;
+    private static List<List<TempWrapper>> result = new ArrayList<>();
+  
+    public static List<List<TempWrapper>> getResult()
+    {
+      return result;
+    }
+  
+    public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
+      {
+        if (result.size() == resultSize) {
+          result.remove(0);
+        }
+        result.add(tuple.getValue());
+      }
+    };
+  }
+  
+  
+  /**
+   * Convert the upstream (user, time) combination to a timestamped tuple of user.
+   */
+  static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
+  {
+    @Override
+    public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
+    {
+      long timestamp = input.getValue();
+      String userName = input.getKey();
+   
+      // Sets the implicit timestamp field to be used in windowing.
+      return new Tuple.TimestampedTuple<>(timestamp, userName);
+      
+    }
+  }
+  
+  /**
+   * Computes the number of edits in each user session.  A session is defined as
+   * a string of edits where each is separated from the next by less than an hour.
+   */
+  static class ComputeSessions
+      extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
+  {
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
+    {
+      return inputStream
+        
+        // Chuck the stream into session windows.
+        .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        
+        // Count the number of edits for a user within one session.
+        .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
+        {
+          @Override
+          public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
+          {
+            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
+          }
+        }, name("ComputeSessions"));
+    }
+  }
+  
+  /**
+   * A comparator class used for comparing two TempWrapper objects.
+   */
+  public static class Comp implements Comparator<TempWrapper>
+  {
+    @Override
+    public int compare(TempWrapper o1, TempWrapper o2)
+    {
+      return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
+    }
+  }
+  
+  /**
+   * A function to extract timestamp from a TempWrapper object.
+   */
+  // TODO: Need to revisit and change back to using TimestampedTuple.
+  public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
+  {
+    @Override
+    public Long apply(@Nullable TempWrapper input)
+    {
+      return input.getTimestamp();
+    }
+  }
+  
+  /**
+   * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
+   * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
+   * remove this class.
+   */
+  public static class TempWrapper
+  {
+    private KeyValPair<String, Long> value;
+    private Long timestamp;
+    
+    public TempWrapper()
+    {
+      
+    }
+    
+    public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
+    {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  
+    @Override
+    public String toString()
+    {
+      return this.value + "  -  " + this.timestamp;
+    }
+  
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+  
+    public void setTimestamp(Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  
+    public KeyValPair<String, Long> getValue()
+    {
+      return value;
+    }
+  
+    public void setValue(KeyValPair<String, Long> value)
+    {
+      this.value = value;
+    }
+  }
+
+  /**
+   * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
+   */
+  private static class TopPerMonth
+      extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+  {
+    
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
+    {
+      TopN<TempWrapper> topN = new TopN<>();
+      topN.setN(10);
+      topN.setComparator(new Comp());
+      
+      return inputStream
+        
+        // Map the input WindowedTuple to a TempWrapper object.
+        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
+        {
+          @Override
+          public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+          {
+            return new TempWrapper(input.getValue(), input.getWindows().get(0).getBeginTimestamp());
+          }
+        }, name("TempWrapper"))
+        
+        // Apply window and trigger option again, this time chuck the stream into fixed time windows.
+        .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
+        
+        // Compute the top 10 user-sessions with most number of edits.
+        .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
+    }
+  }
+  
+  /**
+   * A map function that combine the user and his/her edit session together to a string and use that string as a key
+   * with number of edits in that session as value to create a new key value pair to send to downstream.
+   */
+  static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
+  {
+    @Override
+    public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+    {
+      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(input.getWindows().get(0), new KeyValPair<String, Long>(
+        input.getValue().getKey()  + " : " + input.getWindows().get(0).getBeginTimestamp() + " : " + input.getWindows().get(0).getDurationMillis(),
+        input.getValue().getValue()));
+    }
+  }
+  
+  /**
+   * A flapmap function that turns the result into readable format.
+   */
+  static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
+  {
+    @Override
+    public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
+    {
+      ArrayList<String> result = new ArrayList<>();
+      for (TempWrapper item : input.getValue()) {
+        String session = item.getValue().getKey();
+        long count = item.getValue().getValue();
+        result.add(session + " + " + count + " : " + input.getWindows().get(0).getBeginTimestamp());
+      }
+      return result;
+    }
+  }
+  
+  /**
+   * A composite transform that compute the top wikipedia sessions.
+   */
+  public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
+  {
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
+    {
+      return inputStream
+        .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
+        .addCompositeStreams(new ComputeSessions())
+        .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
+        .addCompositeStreams(new TopPerMonth());
+    }
+  }
+  
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    SessionGen sg = new SessionGen();
+    Collector collector = new Collector();
+    StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
+      .addCompositeStreams(new ComputeTopSessions())
+      .endWith(collector, collector.input, name("collector")).populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
new file mode 100644
index 0000000..2cc04d1
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -0,0 +1,521 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.Group;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam's TrafficRoutes example.
+ */
+@ApplicationAnnotation(name = "TrafficRoutes")
+public class TrafficRoutes implements StreamingApplication
+{
+  static Map<String, String> sdStations = buildStationInfo();
+  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
+  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
+  
+  /**
+   * This class holds information about a station reading's average speed.
+   */
+  public static class StationSpeed implements Comparable<StationSpeed>
+  {
+    @Nullable
+    String stationId;
+    @Nullable
+    Double avgSpeed;
+    @Nullable
+    Long timestamp;
+    
+    public StationSpeed() {}
+    
+    public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
+    {
+      this.stationId = stationId;
+      this.avgSpeed = avgSpeed;
+      this.timestamp = timestamp;
+    }
+  
+    public void setAvgSpeed(@Nullable Double avgSpeed)
+    {
+      this.avgSpeed = avgSpeed;
+    }
+  
+    public void setStationId(@Nullable String stationId)
+    {
+      this.stationId = stationId;
+    }
+  
+    public void setTimestamp(@Nullable Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  
+    @Nullable
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+  
+    public String getStationId()
+    {
+      return this.stationId;
+    }
+    
+    public Double getAvgSpeed()
+    {
+      return this.avgSpeed;
+    }
+    
+    @Override
+    public int compareTo(StationSpeed other)
+    {
+      return Long.compare(this.timestamp, other.timestamp);
+    }
+  }
+  
+  /**
+   * This class holds information about a route's speed/slowdown.
+   */
+  static class RouteInfo
+  {
+    @Nullable
+    String route;
+    @Nullable
+    Double avgSpeed;
+    @Nullable
+    Boolean slowdownEvent;
+    
+    public RouteInfo()
+    {
+      
+    }
+    
+    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
+    {
+      this.route = route;
+      this.avgSpeed = avgSpeed;
+      this.slowdownEvent = slowdownEvent;
+    }
+    
+    public String getRoute()
+    {
+      return this.route;
+    }
+    
+    public Double getAvgSpeed()
+    {
+      return this.avgSpeed;
+    }
+    
+    public Boolean getSlowdownEvent()
+    {
+      return this.slowdownEvent;
+    }
+  }
+  
+  /**
+   * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
+   * with the extracted timestamp.
+   */
+  static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
+  {
+    
+    @Override
+    public Tuple.TimestampedTuple<String> f(String input)
+    {
+      String[] items = input.split(",");
+      String timestamp = tryParseTimestamp(items);
+    
+      return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
+    }
+  }
+  
+  /**
+   * Filter out readings for the stations along predefined 'routes', and output
+   * (station, speed info) keyed on route.
+   */
+  static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
+  {
+    
+    @Override
+    public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
+    {
+      
+      ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
+      String[] items = input.getValue().split(",");
+      String stationType = tryParseStationType(items);
+      // For this analysis, use only 'main line' station types
+      if (stationType != null && stationType.equals("ML")) {
+        Double avgSpeed = tryParseAvgSpeed(items);
+        String stationId = tryParseStationId(items);
+        // For this simple example, filter out everything but some hardwired routes.
+        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
+          StationSpeed stationSpeed =
+              new StationSpeed(stationId, avgSpeed, input.getTimestamp());
+          // The tuple key is the 'route' name stored in the 'sdStations' hash.
+          KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
+          result.add(outputValue);
+        }
+      }
+      return result;
+    }
+  }
+  
+  /**
+   * For a given route, track average speed for the window. Calculate whether
+   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
+   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
+   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
+   */
+  static class GatherStats
+      implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
+  {
+    @Override
+    public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
+    {
+      ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
+      String route = input.getValue().getKey();
+      double speedSum = 0.0;
+      int speedCount = 0;
+      int speedups = 0;
+      int slowdowns = 0;
+      List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
+      // StationSpeeds sort by embedded timestamp.
+      Collections.sort(infoList);
+      Map<String, Double> prevSpeeds = new HashMap<>();
+      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
+      for (StationSpeed item : infoList) {
+        Double speed = item.getAvgSpeed();
+        if (speed != null) {
+          speedSum += speed;
+          speedCount++;
+          Double lastSpeed = prevSpeeds.get(item.getStationId());
+          if (lastSpeed != null) {
+            if (lastSpeed < speed) {
+              speedups += 1;
+            } else {
+              slowdowns += 1;
+            }
+          }
+          prevSpeeds.put(item.getStationId(), speed);
+        }
+      }
+      if (speedCount == 0) {
+        // No average to compute.
+        return result;
+      }
+      double speedAvg = speedSum / speedCount;
+      boolean slowdownEvent = slowdowns >= 2 * speedups;
+      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
+      result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
+      return result;
+    }
+  }
+  
+  /**
+   * Output Pojo class for outputting result to JDBC.
+   */
+  static class OutputPojo
+  {
+    private Double avgSpeed;
+    private Boolean slowdownEvent;
+    private String key;
+    private Long timestamp;
+    
+    public OutputPojo()
+    {
+    }
+ 
+    public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
+    {
+      this.avgSpeed = avgSpeed;
+      this.slowdownEvent = slowdownEvent;
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  
+    @Override
+    public String toString()
+    {
+      return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
+    }
+  
+    public void setTimestamp(Long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+  
+    public Long getTimestamp()
+    {
+      return timestamp;
+    }
+  
+    public void setAvgSpeed(Double avgSpeed)
+    {
+      this.avgSpeed = avgSpeed;
+    }
+  
+    public Double getAvgSpeed()
+    {
+      return avgSpeed;
+    }
+  
+    public void setKey(String key)
+    {
+      this.key = key;
+    }
+  
+    public String getKey()
+    {
+      return key;
+    }
+  
+    public void setSlowdownEvent(Boolean slowdownEvent)
+    {
+      this.slowdownEvent = slowdownEvent;
+    }
+  
+    public Boolean getSlowdownEvent()
+    {
+      return slowdownEvent;
+    }
+    
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
+  
+    public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
+    {
+      return result;
+    }
+    
+    public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
+    {
+      @Override
+      public void process(OutputPojo tuple)
+      {
+        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
+      }
+    };
+  }
+  
+  /**
+   * Format the results of the slowdown calculations to a OutputPojo.
+   */
+  static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
+  {
+    @Override
+    public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
+    {
+      RouteInfo routeInfo = input.getValue().getValue();
+      OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
+      return row;
+    }
+  }
+    
+  
+  /**
+   * This composite transformation extracts speed info from traffic station readings.
+   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
+   * Lastly, it formats the results for JDBC.
+   */
+  static class TrackSpeed extends
+      CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
+  {
+    @Override
+    public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
+    {
+      // Apply a GroupByKey transform to collect a list of all station
+      // readings for a given route.
+      WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
+          inputStream
+          .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
+            {
+              return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
+            }
+          }, name("GroupByKey"));
+      
+      // Analyze 'slowdown' over the route readings.
+      WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
+          .flatMap(new GatherStats(), name("GatherStats"));
+      
+      // Format the results for writing to JDBC table.
+      WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
+      
+      return results;
+    }
+  }
+  
+  
+  private static Double tryParseAvgSpeed(String[] inputItems)
+  {
+    try {
+      return Double.parseDouble(tryParseString(inputItems, 3));
+    } catch (NumberFormatException e) {
+      return null;
+    } catch (NullPointerException e) {
+      return null;
+    }
+  }
+  
+  private static String tryParseStationType(String[] inputItems)
+  {
+    return tryParseString(inputItems, 2);
+  }
+  
+  private static String tryParseStationId(String[] inputItems)
+  {
+    return tryParseString(inputItems, 1);
+  }
+  
+  private static String tryParseTimestamp(String[] inputItems)
+  {
+    return tryParseString(inputItems, 0);
+  }
+  
+  private static String tryParseString(String[] inputItems, int index)
+  {
+    return inputItems.length >= index ? inputItems[index] : null;
+  }
+  
+  /**
+   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
+   */
+  private static Map<String, String> buildStationInfo()
+  {
+    Map<String, String> stations = new Hashtable<String, String>();
+    stations.put("1108413", "SDRoute1"); // from freeway 805 S
+    stations.put("1108699", "SDRoute2"); // from freeway 78 E
+    stations.put("1108702", "SDRoute2");
+    return stations;
+  }
+  
+  /**
+   * A dummy generator to generate some traffic information.
+   */
+  public static class InfoGen extends BaseOperator implements InputOperator
+  {
+    public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    
+    private String[] stationTypes = new String[]{"ML", "BL", "GL"};
+    private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
+    private double ave = 55.0;
+    private long timestamp;
+    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
+    private static int tupleCount = 0;
+  
+    public static int getTupleCount()
+    {
+      return tupleCount;
+    }
+  
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      tupleCount = 0;
+      timestamp = System.currentTimeMillis();
+    }
+  
+    @Override
+    public void emitTuples()
+    {
+      for (String stationType : stationTypes) {
+        for (int stationID : stationIDs) {
+          double speed = Math.random() * 20 + ave;
+          long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
+          try {
+            output.emit(time + "," + stationID + "," + stationType + "," + speed);
+            tupleCount++;
+         
+            Thread.sleep(50);
+          } catch (Exception e) {
+            // Ignore it
+          }
+        }
+      }
+    }
+  }
+  
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    InfoGen infoGen = new InfoGen();
+    Collector collector = new Collector();
+    
+    // Create a stream from the input operator.
+    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
+        
+        // Extract the timestamp from the input and wrap it into a TimestampedTuple.
+        .map(new ExtractTimestamps(), name("ExtractTimestamps"));
+    
+    stream
+        // Extract the average speed of a station.
+        .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
+      
+        // Apply window and trigger option.
+        .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
+        
+        // Apply TrackSpeed composite transformation to compute the route information.
+        .addCompositeStreams(new TrackSpeed())
+      
+        // print the result to console.
+        .print()
+        .endWith(collector, collector.input, name("Collector"))
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
new file mode 100644
index 0000000..ecad622
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
+ * accordingly in /resources/META-INF/properties.xml.
+ *
+ * The authentication requires following 4 information.
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+@ApplicationAnnotation(name = "TwitterAutoComplete")
+public class TwitterAutoComplete implements StreamingApplication
+{
+  /**
+   * Check whether every character in a string is ASCII encoding.
+   */
+  public static class StringUtils
+  {
+    static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
+
+    public static boolean isAscii(String v)
+    {
+      return encoder.canEncode(v);
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all hashtags from a text form tweet.
+   */
+  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
+  {
+
+    @Override
+    public Iterable<String> f(String input)
+    {
+      List<String> result = new LinkedList<>();
+      Matcher m = Pattern.compile("#\\S+").matcher(input);
+      while (m.find()) {
+        result.add(m.group().substring(1));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Lower latency, but more expensive.
+   */
+  private static class ComputeTopFlat
+      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final int minPrefix;
+
+    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.minPrefix = minPrefix;
+    }
+
+    @Override
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
+        WindowedStream<CompletionCandidate> input)
+    {
+      TopNByKey topNByKey = new TopNByKey();
+      topNByKey.setN(candidatesPerPrefix);
+      return input
+        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
+        .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
+          {
+            // TODO: Should be removed after Auto-wrapping is supported.
+            return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
+          }
+        }, name("TopNByKey"));
+    }
+  }
+
+  /**
+   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
+   * KeyValPairs of the prefix and the CompletionCandidate
+   */
+  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
+  {
+    private final int minPrefix;
+    private final int maxPrefix;
+
+    public AllPrefixes()
+    {
+      this(0, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix)
+    {
+      this(minPrefix, Integer.MAX_VALUE);
+    }
+
+    public AllPrefixes(int minPrefix, int maxPrefix)
+    {
+      this.minPrefix = minPrefix;
+      this.maxPrefix = maxPrefix;
+    }
+
+    @Override
+    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
+    {
+      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
+      String word = input.getValue();
+      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
+        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
+      }
+      return result;
+    }
+  }
+
+  /**
+   * A Composite stream transform that takes as input a list of tokens and returns
+   * the most common tokens per prefix.
+   */
+  public static class ComputeTopCompletions
+      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
+  {
+    private final int candidatesPerPrefix;
+    private final boolean recursive;
+
+    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
+    {
+      this.candidatesPerPrefix = candidatesPerPrefix;
+      this.recursive = recursive;
+    }
+
+    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
+    {
+      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
+    {
+
+      ApexStream<CompletionCandidate> candidates = inputStream
+          .countByKey(new Function.ToKeyValue<String, String, Long>()
+          {
+            @Override
+            public Tuple<KeyValPair<String, Long>> f(String input)
+            {
+              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
+            }
+          }, name("Hashtag Count")).map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
+          {
+            @Override
+            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
+            {
+              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
+            }
+          }, name("KeyValPair to CompletionCandidate"));
+
+      return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
+
+    }
+  }
+
+  /**
+   * FilterFunction to filter out tweets with non-acsii characters.
+   */
+  static class ASCIIFilter implements Function.FilterFunction<String>
+  {
+    @Override
+    public boolean f(String input)
+    {
+      return StringUtils.isAscii(input);
+    }
+  }
+
+  /**
+   * Populate the dag with High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    TwitterSampleInput input = new TwitterSampleInput();
+
+    WindowOption windowOption = new WindowOption.GlobalWindow();
+
+    ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
+        .filter(new ASCIIFilter(), name("ACSII Filter"))
+        .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
+
+    ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
+        tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
+        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
+
+    s.populateDag(dag);
+  }
+}