You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:19 UTC

[3/6] apex-malhar git commit: Added Beam Examples and Implementations of Accumulation.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index 5d4c628..ecd71ae 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -18,19 +18,33 @@
  */
 package org.apache.apex.malhar.stream.sample.cookbook;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+
+import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.util.KeyValPair;
 
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
 /**
  * An example that reads the public 'Shakespeare' data, and for each word in
  * the dataset that is over a given length, generates a string containing the
@@ -40,12 +54,13 @@ import com.datatorrent.lib.util.KeyValPair;
  * key-grouped Collection
  *
  */
-public class CombinePerKeyExamples
+@ApplicationAnnotation(name = "CombinePerKeyExamples")
+public class CombinePerKeyExamples implements StreamingApplication
 {
   // Use the shakespeare public BigQuery sample
   private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
   // We'll track words >= this word length across all plays in the table.
-  private static final int MIN_WORD_LENGTH = 9;
+  private static final int MIN_WORD_LENGTH = 0;
 
   /**
    * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
@@ -76,70 +91,59 @@ public class CombinePerKeyExamples
     @Override
     public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
     {
-      return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null);
+      return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
     }
   }
-
+  
+  /**
+   * A reduce function to concat two strings together.
+   */
+  public static class Concat extends ReduceFn<String>
+  {
+    @Override
+    public String reduce(String input1, String input2)
+    {
+      return input1 + ", " + input2;
+    }
+  }
+  
   /**
    * Reads the public 'Shakespeare' data, and for each word in the dataset
    * over a given length, generates a string containing the list of play names
    * in which that word appears.
    */
-  static class PlaysForWord
-      extends CompositeStreamTransform<SampleBean, SampleBean>
+  private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
   {
-
+    
     @Override
-    public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+    public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
     {
-      // fix this later
-      return inputStream.map(new ExtractLargeWordsFn())
-          .window(new WindowOption.GlobalWindow())
-          .reduceByKey(new ReduceFn<String>()
-          {
-            @Override
-            public String defaultAccumulatedValue()
-            {
-              return "";
-            }
-
-            @Override
-            public String accumulate(String accumulatedValue, String input)
-            {
-              return accumulatedValue + "," + input;
-            }
-
-            @Override
-            public String merge(String accumulatedValue1, String accumulatedValue2)
-            {
-              return accumulatedValue1 + "," + accumulatedValue2;
-            }
-
-            @Override
-            public String getOutput(String accumulatedValue)
-            {
-              return accumulatedValue;
-            }
-
-            @Override
-            public String getRetraction(String value)
-            {
-              return value;
-            }
-          }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>()
-
+      return inputStream
+          // Extract words from the input SampleBeam stream.
+          .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
+          
+          // Apply window and trigger option to the streams.
+          .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        
+          // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
+          .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
           {
             @Override
             public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
             {
-              return null;
+              return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
             }
-          })
-          .map(new FormatShakespeareOutputFn());
+          }, name("Concat"))
+        
+          // Format the output back to a SampleBeam object.
+          .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
     }
   }
-
-
+  
+  
+  /**
+   * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
+   */
   public static class SampleBean
   {
 
@@ -148,17 +152,20 @@ public class CombinePerKeyExamples
 
     }
 
-    public SampleBean(String word, String all_plays, String corpus)
+    public SampleBean(String word, String corpus)
     {
       this.word = word;
-      this.all_plays = all_plays;
       this.corpus = corpus;
     }
-
+  
+    @Override
+    public String toString()
+    {
+      return this.word + " : "  + this.corpus;
+    }
+  
     private String word;
 
-    private String all_plays;
-
     private String corpus;
 
     public void setWord(String word)
@@ -180,58 +187,87 @@ public class CombinePerKeyExamples
     {
       return corpus;
     }
-
-    public void setAll_plays(String all_plays)
-    {
-      this.all_plays = all_plays;
-    }
-
-    public String getAll_plays()
-    {
-      return all_plays;
-    }
   }
-
-  public static class SampleInput implements InputOperator
+  
+  /**
+   * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
+   * data.
+   */
+  public static class SampleInput extends BaseOperator implements InputOperator
   {
 
     public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
-
-    @Override
-    public void emitTuples()
+    private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
+    private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
+    private static int i;
+  
+    public static int getI()
     {
-
+      return i;
     }
-
+  
     @Override
-    public void beginWindow(long l)
+    public void setup(Context.OperatorContext context)
     {
-
+      super.setup(context);
+      i = 0;
     }
-
+  
     @Override
-    public void endWindow()
+    public void emitTuples()
     {
-
+      while (i < 1) {
+        for (String word : words) {
+          for (String corpus : corpuses) {
+            beanOutput.emit(new SampleBean(word, corpus));
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException e) {
+              // Ignore it
+            }
+          }
+        }
+        i++;
+      }
+    
     }
-
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    static List<SampleBean> result;
+  
     @Override
     public void setup(Context.OperatorContext context)
     {
-
+      result = new ArrayList<>();
     }
-
-    @Override
-    public void teardown()
+  
+    public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
     {
-
-    }
+      @Override
+      public void process(SampleBean tuple)
+      {
+        result.add(tuple);
+      }
+    };
   }
-
-
-  public static void main(String[] args) throws Exception
+  
+  /**
+   * Populate dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
   {
     SampleInput input = new SampleInput();
-    StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord());
+    Collector collector = new Collector();
+    StreamFactory.fromInput(input, input.beanOutput, name("input"))
+      .addCompositeStreams(new PlaysForWord())
+      .print()
+      .endWith(collector, collector.input, name("Collector"))
+      .populateDag(dag);
+    
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
new file mode 100644
index 0000000..53426f3
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * Beam DeDupExample.
+ */
+@ApplicationAnnotation(name = "DeDupExample")
+public class DeDupExample implements StreamingApplication
+{
+  
+  public static class Collector extends BaseOperator
+  {
+    private static Tuple.WindowedTuple<List<String>> result;
+    private static boolean done = false;
+  
+    public static Tuple.WindowedTuple<List<String>> getResult()
+    {
+      return result;
+    }
+  
+    public static boolean isDone()
+    {
+      return done;
+    }
+  
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      result = new Tuple.WindowedTuple<>();
+      done = false;
+    }
+  
+    public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<List<String>> tuple)
+      {
+        result = tuple;
+        if (result.getValue().contains("bye")) {
+          done = true;
+        }
+      }
+    };
+  }
+    
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    Collector collector = new Collector();
+    
+    // Create a stream that reads from files in a local folder and output lines one by one to downstream.
+    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
+      
+        // Extract all the words from the input line of text.
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+          }
+        }, name("ExtractWords"))
+      
+        // Change the words to lower case, also shutdown the app when the word "bye" is detected.
+        .map(new Function.MapFunction<String, String>()
+        {
+          @Override
+          public String f(String input)
+          {
+            return input.toLowerCase();
+          }
+        }, name("ToLowerCase"));
+    
+    // Apply window and trigger option.
+    stream.window(new WindowOption.GlobalWindow(),
+        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
+        
+        // Remove the duplicate words and print out the result.
+        .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector, collector.input)
+    
+        .populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
new file mode 100644
index 0000000..3643eab
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * Tuple class for JDBC input of {@link MaxPerKeyExamples}.
+ */
+public class InputPojo extends Object
+{
+  private int month;
+  private int day;
+  private int year;
+  private double meanTemp;
+  
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
+  }
+  
+  public void setMonth(int month)
+  {
+    this.month = month;
+  }
+  
+  public int getMonth()
+  {
+    return this.month;
+  }
+  
+  public void setDay(int day)
+  {
+    this.day = day;
+  }
+  
+  public int getDay()
+  {
+    return day;
+  }
+  
+  public void setYear(int year)
+  {
+    this.year = year;
+  }
+  
+  public int getYear()
+  {
+    return year;
+  }
+  
+  public void setMeanTemp(double meanTemp)
+  {
+    this.meanTemp = meanTemp;
+  }
+  
+  public double getMeanTemp()
+  {
+    return meanTemp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..97b2696
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.impl.accumulation.Max;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * MaxPerKeyExamples Application from Beam
+ */
+@ApplicationAnnotation(name = "MaxPerKeyExamples")
+public class MaxPerKeyExamples implements StreamingApplication
+{
+  
+  /**
+   *  A map function to extract the mean temperature from {@link InputPojo}.
+   */
+  public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>>
+  {
+    @Override
+    public KeyValPair<Integer, Double> f(InputPojo row)
+    {
+      Integer month = row.getMonth();
+      Double meanTemp = row.getMeanTemp();
+      return new KeyValPair<Integer, Double>(month, meanTemp);
+    }
+  }
+  
+  
+  /**
+   * A map function to format output to {@link OutputPojo}.
+   */
+  public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo>
+  {
+    @Override
+    public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input)
+    {
+      OutputPojo row = new OutputPojo();
+      row.setMonth(input.getValue().getKey());
+      row.setMeanTemp(input.getValue().getValue());
+      return row;
+    }
+  }
+  
+  /**
+   * A composite transformation to perform three tasks:
+   * 1. extract the month and its mean temperature from input pojo.
+   * 2. find the maximum mean temperature for every month.
+   * 3. format the result to a output pojo object.
+   */
+  public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>>
+  {
+    @Override
+    public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows)
+    {
+      // InputPojo... => <month, meanTemp> ...
+      WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
+      
+      // month, meanTemp... => <month, max mean temp>...
+      WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
+          temps.accumulateByKey(new Max<Double>(),
+          new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>()
+            {
+              @Override
+              public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
+              {
+                return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input);
+              }
+            }, name("MaxPerMonth"));
+      
+      // <month, max>... => OutputPojo...
+      WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
+      
+      return results;
+    }
+  }
+  
+  /**
+   * Method to set field info for {@link JdbcPOJOInputOperator}.
+   * @return
+   */
+  private List<FieldInfo> addInputFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
+    return fieldInfos;
+  }
+  
+  /**
+   * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
+   * @return
+   */
+  private List<JdbcFieldInfo> addOutputFieldInfos()
+  {
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
+    return fieldInfos;
+  }
+  
+  
+  /**
+   * Populate the dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
+    jdbcInput.setFieldInfos(addInputFieldInfos());
+  
+    JdbcStore store = new JdbcStore();
+    jdbcInput.setStore(store);
+  
+    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+    jdbcOutput.setFieldInfos(addOutputFieldInfos());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    jdbcOutput.setStore(outputStore);
+    
+    // Create stream that reads from a Jdbc Input.
+    ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
+      
+        // Apply window and trigger option to the stream.
+        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+      
+        // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
+        .map(new Function.MapFunction<Object, InputPojo>()
+        {
+          @Override
+          public InputPojo f(Object input)
+          {
+            return (InputPojo)input;
+          }
+        }, name("ObjectToInputPojo"))
+      
+        // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
+        .addCompositeStreams(new MaxMeanTemp())
+      
+        // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
+        .map(new Function.MapFunction<OutputPojo, Object>()
+        {
+          @Override
+          public Object f(OutputPojo input)
+          {
+            return (Object)input;
+          }
+        }, name("OutputPojoToObject"))
+      
+        // Output the result to Jdbc Output.
+        .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
+    
+    stream.populateDag(dag);
+  
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
new file mode 100644
index 0000000..db2a09e
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}.
+ */
+public class OutputPojo
+{
+  private int month;
+  private double meanTemp;
+  
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
+  }
+  
+  public void setMonth(int month)
+  {
+    this.month = month;
+  }
+  
+  public int getMonth()
+  {
+    return this.month;
+  }
+  
+  public void setMeanTemp(double meanTemp)
+  {
+    this.meanTemp = meanTemp;
+  }
+  
+  public double getMeanTemp()
+  {
+    return meanTemp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
index 903f624..bf23e3a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import org.joda.time.Duration;
 
 import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
@@ -85,31 +86,31 @@ import com.datatorrent.lib.util.KeyValPair;
  * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
  * the window duration, until the first pane of non-late data has been emitted, to see more
  * interesting results.
- * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
+ * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
  *
  * <p> To see the late data i.e. dropped by the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
- * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
  *
  * <p>To see the the difference between accumulation mode and discarding mode,
  * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
- * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
- * window DESC, processing_time}
+ * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processingTime}
  *
  * <p> To see speculative results every minute,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
- * ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processingTime}
  *
  * <p> To see speculative results every five minutes after the end of the window
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
- * and freeway = "5" ORDER BY window DESC, processing_time}
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processingTime}
  *
  * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
  * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
  *
  * <p> To reduce the number of results for each query we can add additional where clauses.
  * For examples, To see the results of the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
  * window = "<enter_window_interval>"}
  *
  * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
@@ -135,7 +136,7 @@ public class TriggerExample
    * The example uses "freeway" as the key. Event time is the timestamp associated with the data
    * element and processing time is the time when the data element gets processed in the pipeline.
    * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
-   * Key (freeway) | Value (total_flow) | event time | processing time
+   * Key (freeway) | Value (totalFlow) | event time | processing time
    * 5             | 50                 | 10:00:03   | 10:00:47
    * 5             | 30                 | 10:01:00   | 10:01:03
    * 5             | 30                 | 10:02:00   | 11:07:00
@@ -157,7 +158,7 @@ public class TriggerExample
    * close at 10:44:59, when the watermark passes 10:30:00.
    */
   static class CalculateTotalFlow
-      extends CompositeStreamTransform<String, SampleBean>
+      extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
   {
     private int windowDuration;
 
@@ -167,7 +168,7 @@ public class TriggerExample
     }
 
     @Override
-    public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+    public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
     {
       // Concept #1: The default triggering behavior
       // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
@@ -182,14 +183,14 @@ public class TriggerExample
 
       // The results for the example above with the default trigger and zero allowed lateness
       // would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
       // 5             | 260                | 6                 | true    | true   | ON_TIME
 
       // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
       // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
       // late, and dropped.
-
-      ApexStream<SampleBean> defaultTriggerResults = inputStream
+  
+      WindowedStream<SampleBean> defaultTriggerResults = inputStream
           .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
           new TriggerOption().discardingFiredPanes())
           .addCompositeStreams(new TotalFlow("default"));
@@ -205,13 +206,13 @@ public class TriggerExample
 
       // The results for the example above with the default trigger and ONE_DAY allowed lateness
       // would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
       // 5             | 260                | 6                 | true    | false  | ON_TIME
       // 5             | 60                 | 1                 | false   | false  | LATE
       // 5             | 30                 | 1                 | false   | false  | LATE
       // 5             | 20                 | 1                 | false   | false  | LATE
       // 5             | 60                 | 1                 | false   | false  | LATE
-      ApexStream<SampleBean> withAllowedLatenessResults = inputStream
+      WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
           .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
           new TriggerOption().discardingFiredPanes(),
           Duration.standardDays(1))
@@ -226,7 +227,7 @@ public class TriggerExample
       // We also use accumulatingFiredPanes to build up the results across each pane firing.
 
       // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
       // 5             | 80                 | 2                 | true    | false  | EARLY
       // 5             | 100                | 3                 | false   | false  | EARLY
       // 5             | 260                | 6                 | false   | false  | EARLY
@@ -258,7 +259,7 @@ public class TriggerExample
       // Every pane produced will either be EARLY, ON_TIME or LATE.
 
       // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
       // 5             | 80                 | 2                 | true    | false  | EARLY
       // 5             | 100                | 3                 | false   | false  | EARLY
       // 5             | 260                | 6                 | false   | false  | EARLY
@@ -267,7 +268,7 @@ public class TriggerExample
       // 5             | 430                | 10                | false   | false  | LATE
 
       // For more possibilities of how to build advanced triggers, see {@link Trigger}.
-      ApexStream<SampleBean> sequentialResults = inputStream
+      WindowedStream<SampleBean> sequentialResults = inputStream
           .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
               // Speculative every ONE_MINUTE
           new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
@@ -293,7 +294,7 @@ public class TriggerExample
    * objects, to save to BigQuery.
    */
   static class TotalFlow extends
-      CompositeStreamTransform<String, SampleBean>
+      CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
   {
     private String triggerType;
 
@@ -303,13 +304,10 @@ public class TriggerExample
     }
 
     @Override
-    public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+    public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
     {
-      if (!(inputStream instanceof WindowedStream)) {
-        throw new RuntimeException("Not supported here");
-      }
-      WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream;
-      ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream
+  
+      WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
           .groupByKey(new ExtractFlowInfo());
 
       return flowPerFreeway
@@ -361,13 +359,13 @@ public class TriggerExample
     {
     }
 
-    private String trigger_type;
+    private String triggerType;
 
     private String freeway;
 
-    private int total_flow;
+    private int totalFlow;
 
-    private long number_of_records;
+    private long numberOfRecords;
 
     private String window;
 
@@ -377,9 +375,9 @@ public class TriggerExample
 
     private Date timing;
 
-    private Date event_time;
+    private Date eventTime;
 
-    private Date processing_time;
+    private Date processingTime;
 
     @Override
     public boolean equals(Object o)
@@ -391,50 +389,49 @@ public class TriggerExample
         return false;
       }
       SampleBean that = (SampleBean)o;
-      return total_flow == that.total_flow &&
-          number_of_records == that.number_of_records &&
+      return totalFlow == that.totalFlow &&
+          numberOfRecords == that.numberOfRecords &&
           isFirst == that.isFirst &&
           isLast == that.isLast &&
-          Objects.equals(trigger_type, that.trigger_type) &&
+          Objects.equals(triggerType, that.triggerType) &&
           Objects.equals(freeway, that.freeway) &&
           Objects.equals(window, that.window) &&
           Objects.equals(timing, that.timing) &&
-          Objects.equals(event_time, that.event_time) &&
-          Objects.equals(processing_time, that.processing_time);
+          Objects.equals(eventTime, that.eventTime) &&
+          Objects.equals(processingTime, that.processingTime);
     }
 
     @Override
     public int hashCode()
     {
       return Objects
-          .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time,
-              processing_time);
+          .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
+            processingTime);
     }
 
-    public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window,
-        boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time)
+    public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
     {
 
-      this.trigger_type = trigger_type;
+      this.triggerType = triggerType;
       this.freeway = freeway;
-      this.total_flow = total_flow;
-      this.number_of_records = number_of_records;
+      this.totalFlow = totalFlow;
+      this.numberOfRecords = numberOfRecords;
       this.window = window;
       this.isFirst = isFirst;
       this.isLast = isLast;
       this.timing = timing;
-      this.event_time = event_time;
-      this.processing_time = processing_time;
+      this.eventTime = eventTime;
+      this.processingTime = processingTime;
     }
 
-    public String getTrigger_type()
+    public String getTriggerType()
     {
-      return trigger_type;
+      return triggerType;
     }
 
-    public void setTrigger_type(String trigger_type)
+    public void setTriggerType(String triggerType)
     {
-      this.trigger_type = trigger_type;
+      this.triggerType = triggerType;
     }
 
     public String getFreeway()
@@ -447,24 +444,24 @@ public class TriggerExample
       this.freeway = freeway;
     }
 
-    public int getTotal_flow()
+    public int getTotalFlow()
     {
-      return total_flow;
+      return totalFlow;
     }
 
-    public void setTotal_flow(int total_flow)
+    public void setTotalFlow(int totalFlow)
     {
-      this.total_flow = total_flow;
+      this.totalFlow = totalFlow;
     }
 
-    public long getNumber_of_records()
+    public long getNumberOfRecords()
     {
-      return number_of_records;
+      return numberOfRecords;
     }
 
-    public void setNumber_of_records(long number_of_records)
+    public void setNumberOfRecords(long numberOfRecords)
     {
-      this.number_of_records = number_of_records;
+      this.numberOfRecords = numberOfRecords;
     }
 
     public String getWindow()
@@ -507,24 +504,24 @@ public class TriggerExample
       this.timing = timing;
     }
 
-    public Date getEvent_time()
+    public Date getEventTime()
     {
-      return event_time;
+      return eventTime;
     }
 
-    public void setEvent_time(Date event_time)
+    public void setEventTime(Date eventTime)
     {
-      this.event_time = event_time;
+      this.eventTime = eventTime;
     }
 
-    public Date getProcessing_time()
+    public Date getProcessingTime()
     {
-      return processing_time;
+      return processingTime;
     }
 
-    public void setProcessing_time(Date processing_time)
+    public void setProcessingTime(Date processingTime)
     {
-      this.processing_time = processing_time;
+      this.processingTime = processingTime;
     }
   }
 
@@ -532,10 +529,10 @@ public class TriggerExample
    * Extract the freeway and total flow in a reading.
    * Freeway is used as key since we are calculating the total flow for each freeway.
    */
-  static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>>
+  static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
   {
     @Override
-    public KeyValPair<String, Integer> f(String input)
+    public Tuple<KeyValPair<String, Integer>> f(String input)
     {
       String[] laneInfo = input.split(",");
       if (laneInfo[0].equals("timestamp")) {
@@ -553,7 +550,7 @@ public class TriggerExample
       if (totalFlow == null || totalFlow <= 0) {
         return null;
       }
-      return new KeyValPair<>(freeway, totalFlow);
+      return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/resources/META-INF/properties.xml b/demos/highlevelapi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..ead0460
--- /dev/null
+++ b/demos/highlevelapi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+
+  <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work -->
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name>
+    <value></value>
+  </property>
+
+  <!-- Properties for StreamingWordExtract -->
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name>
+    <value>Test</value>
+  </property>
+
+  <!-- Properties for MaxPerKeyExamples -->
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name>
+    <value>InputTable</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name>
+    <value>SELECT * FROM InputTable;</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name>
+    <value>OutputTable</value>
+  </property>
+
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
new file mode 100644
index 0000000..101953f
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link MinimalWordCount}.
+ */
+public class MinimalWordCountTest
+{
+  @Test
+  public void MinimalWordCountTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+  
+    MinimalWordCount app = new MinimalWordCount();
+
+    lma.prepareDAG(app, conf);
+ 
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return MinimalWordCount.Collector.isDone();
+      }
+    });
+    
+    lc.run(10000);
+  
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
new file mode 100644
index 0000000..952356f
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+public class WindowedWordCountTest
+{
+  @Test
+  public void WindowedWordCountTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new WindowedWordCount(), conf);
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return WindowedWordCount.TextInput.isDone();
+      }
+    });
+    
+    lc.run(60000);
+    
+    Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
+    Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
+    Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
+    Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
+    Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
+  }
+  
+  public long countSum(Map<KeyValPair<Long, String>, Long> map)
+  {
+    long sum = 0;
+    for (long count : map.values()) {
+      sum += count;
+    }
+    return sum;
+  }
+  
+  public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
+  {
+    long sum = 0;
+    for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) {
+      if (entry.getKey().getValue().equals(word)) {
+        sum += entry.getValue();
+      }
+    }
+    return sum;
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
new file mode 100644
index 0000000..dc236f9
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the AutoComplete Application
+ */
+public class AutoCompleteTest
+{
+
+  @Test
+  public void AutoCompleteTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new AutoComplete(), conf);
+    LocalMode.Controller lc = lma.getController();
+    
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return AutoComplete.TweetsInput.isDone();
+      }
+    });
+    
+    lc.run(200000);
+  
+    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("chi"));
+    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("china"));
+    Assert.assertEquals(2, AutoComplete.Collector.getResult().get("china").get(0).getCount());
+    Assert.assertEquals("China", AutoComplete.Collector.getResult().get("china").get(0).getValue());
+    Assert.assertEquals(2, AutoComplete.Collector.getResult().get("d").size());
+    Assert.assertEquals(3, AutoComplete.Collector.getResult().get("f").size());
+    Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(0).getCount() >= AutoComplete.Collector.getResult().get("f").get(1).getCount());
+    Assert.assertTrue(AutoComplete.Collector.getResult().get("f").get(1).getCount() >= AutoComplete.Collector.getResult().get("f").get(2).getCount());
+  
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
new file mode 100644
index 0000000..bf9b030
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing StreamingWordExtract application
+ */
+public class StreamingWordExtractTest
+{
+  private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent";
+  private static final String DB_DRIVER = "org.h2.Driver";
+  private static final String DB_URL = "jdbc:h2:~/test";
+  private static final String TABLE_NAME = "Test";
+  private static final String USER_NAME = "root";
+  private static final String PSW = "password";
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+      
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+      
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+          + ")";
+      stmt.executeUpdate(createMetaTable);
+      
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + "(STRINGVALUE VARCHAR(255))";
+      stmt.executeUpdate(createTable);
+      
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+  }
+  
+  @After
+  public void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+      String dropTable = "drop table " + TABLE_NAME;
+      stmt.executeUpdate(dropTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public void setConfig(Configuration conf)
+  {
+    conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
+  }
+  
+  public int getNumOfEventsInStore()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+      
+      String countQuery = "SELECT count(*) from " + TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+  
+  @Test
+  public void StreamingWordExtractTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    setConfig(conf);
+    StreamingWordExtract app = new StreamingWordExtract();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return getNumOfEventsInStore() == 36;
+      }
+    });
+    
+    lc.run(10000);
+  
+    Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
+    Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
new file mode 100644
index 0000000..f8ec086
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TopWikipediaSessions} Application.
+ */
+public class TopWikipediaSessionsTest
+{
+  @Test
+  public void TopWikipediaSessionsTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new TopWikipediaSessions(), conf);
+    LocalMode.Controller lc = lma.getController();
+    
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
+      }
+    });
+    
+    lc.run(30000);
+    
+    for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
+      Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
+    }
+  }
+  
+  public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
+  {
+    if (input.size() == 0 || input.size() == 1) {
+      return true;
+    }
+    for (int i = 0; i < input.size() - 2; i++) {
+      if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
new file mode 100644
index 0000000..e363ca7
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TrafficRoutes} Application.
+ */
+public class TrafficRoutesTest
+{
+
+  @Test
+  public void TrafficRoutesTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new TrafficRoutes(), conf);
+    LocalMode.Controller lc = lma.getController();
+    
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TrafficRoutes.InfoGen.getTupleCount() >= 100;
+      }
+    });
+    
+    lc.run(60000);
+    
+    Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
+    for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
+      Assert.assertTrue(entry.getValue().getKey() <= 75);
+      Assert.assertTrue(entry.getValue().getKey() >= 55);
+      Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
new file mode 100644
index 0000000..9ba2f25
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please
+ * set the time you need to run the application before you run.
+ */
+public class TwitterAutoCompleteTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class);
+
+  @Test
+  @Ignore
+  public void TwitterAutoCompleteTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    //uncomment the following lines and change YOUR_XXX to the corresponding information needed.
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET");
+    lma.prepareDAG(new TwitterAutoComplete(), conf);
+    LocalMode.Controller lc = lma.getController();
+    long start = System.currentTimeMillis();
+    lc.run(60000); // Set your desired time to run the application here.
+    long end = System.currentTimeMillis();
+    long time = end - start;
+    logger.info("Test used " + time + " ms");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
new file mode 100644
index 0000000..5858013
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link CombinePerKeyExamples}.
+ */
+public class CombinePerKeyExamplesTest
+{
+  @Test
+  public void CombinePerKeyExamplesTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+  
+    CombinePerKeyExamples app = new CombinePerKeyExamples();
+      
+    lma.prepareDAG(app, conf);
+    
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return CombinePerKeyExamples.SampleInput.getI() >= 1;
+      }
+    });
+    lc.run(100000);
+  
+    Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size() - 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
new file mode 100644
index 0000000..ed4ddb4
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Test for {@link DeDupExample}.
+ */
+public class DeDupExampleTest
+{
+  @Test
+  public void DeDupExampleTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    
+    DeDupExample app = new DeDupExample();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return DeDupExample.Collector.isDone();
+      }
+    });
+    lc.run(50000);
+  
+    Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
new file mode 100644
index 0000000..51981de
--- /dev/null
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for MaxPerKeyExamples Application.
+ */
+public class MaxPerKeyExamplesTest
+{
+  
+  private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
+  private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
+  private static final String DB_DRIVER = "org.h2.Driver";
+  private static final String DB_URL = "jdbc:h2:~/test";
+  private static final String INPUT_TABLE = "InputTable";
+  private static final String OUTPUT_TABLE = "OutputTable";
+  private static final String USER_NAME = "root";
+  private static final String PSW = "password";
+  private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
+  
+  private static final double[] MEANTEMPS = {85.3, 75.4};
+  
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+      
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+      
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+          + ")";
+      stmt.executeUpdate(createMetaTable);
+      
+      String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
+          + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
+      stmt.executeUpdate(createInputTable);
+  
+      String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
+          + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
+      stmt.executeUpdate(createOutputTable);
+      
+      String cleanTable = "truncate table " + INPUT_TABLE;
+      stmt.executeUpdate(cleanTable);
+  
+      stmt = con.createStatement();
+  
+      String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
+      stmt.executeUpdate(sql);
+      sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
+      stmt.executeUpdate(sql);
+      sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
+      stmt.executeUpdate(sql);
+      
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+  }
+  
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+  
+      Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
+      Statement stmt = con.createStatement();
+  
+      String dropInputTable = "DROP TABLE " + INPUT_TABLE;
+      stmt.executeUpdate(dropInputTable);
+  
+      String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
+      stmt.executeUpdate(dropOutputTable);
+      
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+    
+  }
+  
+  public void setConfig(Configuration conf)
+  {
+    conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcInput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcInput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS);
+    conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
+    conf.set("dt.operator.jdbcInput.prop.query", QUERY);
+  
+    conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
+  }
+  
+  public int getNumEntries()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+    
+      String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+  
+  public Map<Integer, Double> getMaxMeanTemp()
+  {
+    Map<Integer, Double> result = new HashMap<>();
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+    
+      String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      while (resultSet.next()) {
+        result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
+        
+      }
+      return result;
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+  
+  @Test
+  public void MaxPerKeyExampleTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    setConfig(conf);
+    
+    MaxPerKeyExamples app = new MaxPerKeyExamples();
+  
+    lma.prepareDAG(app, conf);
+  
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return getNumEntries() == 2;
+      }
+    });
+    
+    lc.run(5000);
+    
+    double[] result = new double[2];
+    result[0] = getMaxMeanTemp().get(6);
+    result[1] = getMaxMeanTemp().get(7);
+    Assert.assertArrayEquals(MEANTEMPS, result, 0.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dcca7752/demos/highlevelapi/src/test/resources/data/word.txt
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/resources/data/word.txt b/demos/highlevelapi/src/test/resources/data/word.txt
new file mode 100644
index 0000000..7e28409
--- /dev/null
+++ b/demos/highlevelapi/src/test/resources/data/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error