You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/07 06:58:16 UTC

[11/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
new file mode 100644
index 0000000..9fd9495
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.accumulation.Max;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static java.sql.Types.DOUBLE;
+import static java.sql.Types.INTEGER;
+
+import com.google.common.collect.Lists;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.KeyValPair;
+
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
+/**
+ * MaxPerKeyExamples Application from Beam
+ *
+ * @since 3.5.0
+ */
+@ApplicationAnnotation(name = "MaxPerKeyExamples")
+public class MaxPerKeyExamples implements StreamingApplication
+{
+
+  /**
+   *  A map function to extract the mean temperature from {@link InputPojo}.
+   */
+  public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>>
+  {
+    @Override
+    public KeyValPair<Integer, Double> f(InputPojo row)
+    {
+      Integer month = row.getMonth();
+      Double meanTemp = row.getMeanTemp();
+      return new KeyValPair<Integer, Double>(month, meanTemp);
+    }
+  }
+
+
+  /**
+   * A map function to format output to {@link OutputPojo}.
+   */
+  public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo>
+  {
+    @Override
+    public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input)
+    {
+      OutputPojo row = new OutputPojo();
+      row.setMonth(input.getValue().getKey());
+      row.setMeanTemp(input.getValue().getValue());
+      return row;
+    }
+  }
+
+  /**
+   * A composite transformation to perform three tasks:
+   * 1. extract the month and its mean temperature from input pojo.
+   * 2. find the maximum mean temperature for every month.
+   * 3. format the result to a output pojo object.
+   */
+  public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>>
+  {
+    @Override
+    public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows)
+    {
+      // InputPojo... => <month, meanTemp> ...
+      WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
+
+      // month, meanTemp... => <month, max mean temp>...
+      WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
+          temps.accumulateByKey(new Max<Double>(),
+          new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>()
+            {
+              @Override
+              public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
+              {
+                return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input);
+              }
+            }, name("MaxPerMonth"));
+
+      // <month, max>... => OutputPojo...
+      WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
+
+      return results;
+    }
+  }
+
+  /**
+   * Method to set field info for {@link JdbcPOJOInputOperator}.
+   * @return
+   */
+  private List<FieldInfo> addInputFieldInfos()
+  {
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER));
+    fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
+    return fieldInfos;
+  }
+
+  /**
+   * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
+   * @return
+   */
+  private List<JdbcFieldInfo> addOutputFieldInfos()
+  {
+    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER));
+    fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
+    return fieldInfos;
+  }
+
+
+  /**
+   * Populate the dag using High-Level API.
+   * @param dag
+   * @param conf
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
+    jdbcInput.setFieldInfos(addInputFieldInfos());
+
+    JdbcStore store = new JdbcStore();
+    jdbcInput.setStore(store);
+
+    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
+    jdbcOutput.setFieldInfos(addOutputFieldInfos());
+    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
+    jdbcOutput.setStore(outputStore);
+
+    // Create stream that reads from a Jdbc Input.
+    ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
+
+        // Apply window and trigger option to the stream.
+        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+
+        // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
+        .map(new Function.MapFunction<Object, InputPojo>()
+        {
+          @Override
+          public InputPojo f(Object input)
+          {
+            return (InputPojo)input;
+          }
+        }, name("ObjectToInputPojo"))
+
+        // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
+        .addCompositeStreams(new MaxMeanTemp())
+
+        // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
+        .map(new Function.MapFunction<OutputPojo, Object>()
+        {
+          @Override
+          public Object f(OutputPojo input)
+          {
+            return (Object)input;
+          }
+        }, name("OutputPojoToObject"))
+
+        // Output the result to Jdbc Output.
+        .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
+
+    stream.populateDag(dag);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
new file mode 100644
index 0000000..f3d0c64
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+/**
+ * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}.
+ *
+ * @since 3.5.0
+ */
+public class OutputPojo
+{
+  private int month;
+  private double meanTemp;
+
+  @Override
+  public String toString()
+  {
+    return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
+  }
+
+  public void setMonth(int month)
+  {
+    this.month = month;
+  }
+
+  public int getMonth()
+  {
+    return this.month;
+  }
+
+  public void setMeanTemp(double meanTemp)
+  {
+    this.meanTemp = meanTemp;
+  }
+
+  public double getMeanTemp()
+  {
+    return meanTemp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
new file mode 100644
index 0000000..962faa5
--- /dev/null
+++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using various kinds of
+ * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
+ * each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ *   1. The default triggering behavior
+ *   2. Late data with the default trigger
+ *   3. How to get speculative estimates
+ *   4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See:  <a href="https://cloud.google.com/dataflow/model/triggers">
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of some of the data
+ * elements to be in the past. You may override the default {@code --input} with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
+ * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processingTime}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processingTime}
+ *
+ * <p> To see speculative results every five minutes after the end of the window
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processingTime}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ *
+ * @since 3.5.0
+ */
+
+public class TriggerExample
+{
+  //Numeric value of fixed window duration, in minutes
+  public static final int WINDOW_DURATION = 30;
+  // Constants used in triggers.
+  // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+  // ONE_MINUTE is used only with processing time before the end of the window
+  public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+  // FIVE_MINUTES is used only with processing time after the end of the window
+  public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+  // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+  public static final Duration ONE_DAY = Duration.standardDays(1);
+
+  /**
+   * This transform demonstrates using triggers to control when data is produced for each window
+   * Consider an example to understand the results generated by each type of trigger.
+   * The example uses "freeway" as the key. Event time is the timestamp associated with the data
+   * element and processing time is the time when the data element gets processed in the pipeline.
+   * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
+   * Key (freeway) | Value (totalFlow) | event time | processing time
+   * 5             | 50                 | 10:00:03   | 10:00:47
+   * 5             | 30                 | 10:01:00   | 10:01:03
+   * 5             | 30                 | 10:02:00   | 11:07:00
+   * 5             | 20                 | 10:04:10   | 10:05:15
+   * 5             | 60                 | 10:05:00   | 11:03:00
+   * 5             | 20                 | 10:05:01   | 11.07:30
+   * 5             | 60                 | 10:15:00   | 10:27:15
+   * 5             | 40                 | 10:26:40   | 10:26:43
+   * 5             | 60                 | 10:27:20   | 10:27:25
+   * 5             | 60                 | 10:29:00   | 11:11:00
+   *
+   * <p> Dataflow tracks a watermark which records up to what point in event time the data is
+   * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
+   * behind the current processing time. In practice, the actual value would vary over time based
+   * on the systems knowledge of the current PubSub delay and contents of the backlog (data
+   * that has not yet been processed).
+   *
+   * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+   * close at 10:44:59, when the watermark passes 10:30:00.
+   */
+  static class CalculateTotalFlow
+      extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
+  {
+    private int windowDuration;
+
+    CalculateTotalFlow(int windowDuration)
+    {
+      this.windowDuration = windowDuration;
+    }
+
+    @Override
+    public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
+    {
+      // Concept #1: The default triggering behavior
+      // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
+      // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+      // The system also defaults to dropping late data -- data which arrives after the watermark
+      // has passed the event timestamp of the arriving element. This means that the default trigger
+      // will only fire once.
+
+      // Each pane produced by the default trigger with no allowed lateness will be the first and
+      // last pane in the window, and will be ON_TIME.
+
+      // The results for the example above with the default trigger and zero allowed lateness
+      // would be:
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+      // 5             | 260                | 6                 | true    | true   | ON_TIME
+
+      // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+      // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+      // late, and dropped.
+
+      WindowedStream<SampleBean> defaultTriggerResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+          new TriggerOption().discardingFiredPanes())
+          .addCompositeStreams(new TotalFlow("default"));
+
+      // Concept #2: Late data with the default trigger
+      // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+      // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+      // window. Any late data will result in an additional pane being fired for that same window.
+
+      // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+      // To definitely get the last pane when the window closes, use
+      // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+      // The results for the example above with the default trigger and ONE_DAY allowed lateness
+      // would be:
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+      // 5             | 260                | 6                 | true    | false  | ON_TIME
+      // 5             | 60                 | 1                 | false   | false  | LATE
+      // 5             | 30                 | 1                 | false   | false  | LATE
+      // 5             | 20                 | 1                 | false   | false  | LATE
+      // 5             | 60                 | 1                 | false   | false  | LATE
+      WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+          new TriggerOption().discardingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("withAllowedLateness"));
+
+      // Concept #3: How to get speculative estimates
+      // We can specify a trigger that fires independent of the watermark, for instance after
+      // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+      // all the data is available. Since we don't have any triggers that depend on the watermark
+      // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+      // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+      // 5             | 80                 | 2                 | true    | false  | EARLY
+      // 5             | 100                | 3                 | false   | false  | EARLY
+      // 5             | 260                | 6                 | false   | false  | EARLY
+      // 5             | 320                | 7                 | false   | false  | LATE
+      // 5             | 370                | 9                 | false   | false  | LATE
+      // 5             | 430                | 10                | false   | false  | LATE
+
+      ApexStream<SampleBean> speculativeResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+              //Trigger fires every minute
+          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+                  // After emitting each pane, it will continue accumulating the elements so that each
+                  // approximation includes all of the previous data in addition to the newly arrived
+                  // data.
+          .accumulatingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("speculative"));
+
+      // Concept #4: Combining late data and speculative estimates
+      // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+      // and LATE updates based on late data.
+
+      // Each time a triggering condition is satisfied it advances to the next trigger.
+      // If there are new elements this trigger emits a window under following condition:
+      // > Early approximations every minute till the end of the window.
+      // > An on-time firing when the watermark has passed the end of the window
+      // > Every five minutes of late data.
+
+      // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
+      // 5             | 80                 | 2                 | true    | false  | EARLY
+      // 5             | 100                | 3                 | false   | false  | EARLY
+      // 5             | 260                | 6                 | false   | false  | EARLY
+      // [First pane fired after the end of the window]
+      // 5             | 320                | 7                 | false   | false  | ON_TIME
+      // 5             | 430                | 10                | false   | false  | LATE
+
+      // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+      WindowedStream<SampleBean> sequentialResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+              // Speculative every ONE_MINUTE
+          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+          .withLateFiringsAtEvery(Duration.standardMinutes(5))
+                  // After emitting each pane, it will continue accumulating the elements so that each
+                  // approximation includes all of the previous data in addition to the newly arrived
+                  // data.
+          .accumulatingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("sequential"));
+
+      return sequentialResults;
+    }
+
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // The remaining parts of the pipeline are needed to produce the output for each
+  // concept above. Not directly relevant to understanding the trigger examples.
+
+  /**
+   * Calculate total flow and number of records for each freeway and format the results to TableRow
+   * objects, to save to BigQuery.
+   */
+  static class TotalFlow extends
+      CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
+  {
+    private String triggerType;
+
+    public TotalFlow(String triggerType)
+    {
+      this.triggerType = triggerType;
+    }
+
+    @Override
+    public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
+    {
+
+      WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
+          .groupByKey(new ExtractFlowInfo());
+
+      return flowPerFreeway
+          .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
+          {
+            @Override
+            public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
+            {
+              Iterable<Integer> flows = input.getValue();
+              Integer sum = 0;
+              Long numberOfRecords = 0L;
+              for (Integer value : flows) {
+                sum += value;
+                numberOfRecords++;
+              }
+              return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
+            }
+          })
+          .map(new FormatTotalFlow(triggerType));
+    }
+  }
+
+  /**
+   * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
+   * Adds the triggerType, pane information, processing time and the window timestamp.
+   */
+  static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
+  {
+    private String triggerType;
+
+    public FormatTotalFlow(String triggerType)
+    {
+      this.triggerType = triggerType;
+    }
+
+    @Override
+    public SampleBean f(KeyValPair<String, String> input)
+    {
+      String[] values = input.getValue().split(",");
+      //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
+      return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
+          .parseLong(values[1]), null, false, false, null, null, new Date());
+    }
+  }
+
+  public static class SampleBean
+  {
+    public SampleBean()
+    {
+    }
+
+    private String triggerType;
+
+    private String freeway;
+
+    private int totalFlow;
+
+    private long numberOfRecords;
+
+    private String window;
+
+    private boolean isFirst;
+
+    private boolean isLast;
+
+    private Date timing;
+
+    private Date eventTime;
+
+    private Date processingTime;
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SampleBean that = (SampleBean)o;
+      return totalFlow == that.totalFlow &&
+          numberOfRecords == that.numberOfRecords &&
+          isFirst == that.isFirst &&
+          isLast == that.isLast &&
+          Objects.equals(triggerType, that.triggerType) &&
+          Objects.equals(freeway, that.freeway) &&
+          Objects.equals(window, that.window) &&
+          Objects.equals(timing, that.timing) &&
+          Objects.equals(eventTime, that.eventTime) &&
+          Objects.equals(processingTime, that.processingTime);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects
+          .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
+            processingTime);
+    }
+
+    public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
+    {
+
+      this.triggerType = triggerType;
+      this.freeway = freeway;
+      this.totalFlow = totalFlow;
+      this.numberOfRecords = numberOfRecords;
+      this.window = window;
+      this.isFirst = isFirst;
+      this.isLast = isLast;
+      this.timing = timing;
+      this.eventTime = eventTime;
+      this.processingTime = processingTime;
+    }
+
+    public String getTriggerType()
+    {
+      return triggerType;
+    }
+
+    public void setTriggerType(String triggerType)
+    {
+      this.triggerType = triggerType;
+    }
+
+    public String getFreeway()
+    {
+      return freeway;
+    }
+
+    public void setFreeway(String freeway)
+    {
+      this.freeway = freeway;
+    }
+
+    public int getTotalFlow()
+    {
+      return totalFlow;
+    }
+
+    public void setTotalFlow(int totalFlow)
+    {
+      this.totalFlow = totalFlow;
+    }
+
+    public long getNumberOfRecords()
+    {
+      return numberOfRecords;
+    }
+
+    public void setNumberOfRecords(long numberOfRecords)
+    {
+      this.numberOfRecords = numberOfRecords;
+    }
+
+    public String getWindow()
+    {
+      return window;
+    }
+
+    public void setWindow(String window)
+    {
+      this.window = window;
+    }
+
+    public boolean isFirst()
+    {
+      return isFirst;
+    }
+
+    public void setFirst(boolean first)
+    {
+      isFirst = first;
+    }
+
+    public boolean isLast()
+    {
+      return isLast;
+    }
+
+    public void setLast(boolean last)
+    {
+      isLast = last;
+    }
+
+    public Date getTiming()
+    {
+      return timing;
+    }
+
+    public void setTiming(Date timing)
+    {
+      this.timing = timing;
+    }
+
+    public Date getEventTime()
+    {
+      return eventTime;
+    }
+
+    public void setEventTime(Date eventTime)
+    {
+      this.eventTime = eventTime;
+    }
+
+    public Date getProcessingTime()
+    {
+      return processingTime;
+    }
+
+    public void setProcessingTime(Date processingTime)
+    {
+      this.processingTime = processingTime;
+    }
+  }
+
+  /**
+   * Extract the freeway and total flow in a reading.
+   * Freeway is used as key since we are calculating the total flow for each freeway.
+   */
+  static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
+  {
+    @Override
+    public Tuple<KeyValPair<String, Integer>> f(String input)
+    {
+      String[] laneInfo = input.split(",");
+      if (laneInfo[0].equals("timestamp")) {
+        // Header row
+        return null;
+      }
+      if (laneInfo.length < 48) {
+        //Skip the invalid input.
+        return null;
+      }
+      String freeway = laneInfo[2];
+      Integer totalFlow = tryIntegerParse(laneInfo[7]);
+      // Ignore the records with total flow 0 to easily understand the working of triggers.
+      // Skip the records with total flow -1 since they are invalid input.
+      if (totalFlow == null || totalFlow <= 0) {
+        return null;
+      }
+      return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
+    }
+  }
+
+  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+  public static void main(String[] args) throws Exception
+  {
+    StreamFactory.fromFolder("some folder")
+        .addCompositeStreams(new CalculateTotalFlow(60));
+
+  }
+
+  private static Integer tryIntegerParse(String number)
+  {
+    try {
+      return Integer.parseInt(number);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/main/resources/META-INF/properties.xml b/examples/highlevelapi/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..ead0460
--- /dev/null
+++ b/examples/highlevelapi/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
+  </property>
+  -->
+
+  <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work -->
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name>
+    <value></value>
+  </property>
+  <property>
+    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name>
+    <value></value>
+  </property>
+
+  <!-- Properties for StreamingWordExtract -->
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name>
+    <value>Test</value>
+  </property>
+
+  <!-- Properties for MaxPerKeyExamples -->
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name>
+    <value>InputTable</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name>
+    <value>SELECT * FROM InputTable;</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name>
+    <value>root</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name>
+    <value>password</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name>
+    <value>org.hsqldb.jdbcDriver</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
+    <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name>
+    <value>jdbc:hsqldb:mem:test</value>
+  </property>
+  <property>
+    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name>
+    <value>OutputTable</value>
+  </property>
+
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
new file mode 100644
index 0000000..c078683
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link MinimalWordCount}.
+ */
+public class MinimalWordCountTest
+{
+  @Test
+  public void MinimalWordCountTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.MinimalWordCount.operator.console.silent", "true");
+    MinimalWordCount app = new MinimalWordCount();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return MinimalWordCount.Collector.isDone();
+      }
+    });
+
+    lc.run(10000);
+
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
+    Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
new file mode 100644
index 0000000..f0c51f6
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ */
+public class WindowedWordCountTest
+{
+  @Test
+  public void WindowedWordCountTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.WindowedWordCount.operator.console.silent", "true");
+    lma.prepareDAG(new WindowedWordCount(), conf);
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return WindowedWordCount.Collector.isDone();
+      }
+    });
+
+    lc.run(60000);
+
+    Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
+    Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
+    Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
+    Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
+    Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
+  }
+
+  public long countSum(Map<KeyValPair<Long, String>, Long> map)
+  {
+    long sum = 0;
+    for (long count : map.values()) {
+      sum += count;
+    }
+    return sum;
+  }
+
+  public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
+  {
+    long sum = 0;
+    for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) {
+      if (entry.getKey().getValue().equals(word)) {
+        sum += entry.getValue();
+      }
+    }
+    return sum;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
new file mode 100644
index 0000000..4ed2d5d
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the AutoComplete Application
+ */
+public class AutoCompleteTest
+{
+
+  @Test
+  public void AutoCompleteTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.AutoComplete.operator.console.silent", "true");
+    lma.prepareDAG(new AutoComplete(), conf);
+    LocalMode.Controller lc = lma.getController();
+
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return AutoComplete.Collector.isDone();
+      }
+    });
+
+    lc.run(200000);
+
+    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("had"));
+    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("hadoop"));
+    Assert.assertEquals(2, AutoComplete.Collector.getResult().get("mapreduce").get(0).getCount());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
new file mode 100644
index 0000000..dc9cdec
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing StreamingWordExtract application
+ */
+public class StreamingWordExtractTest
+{
+  private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent";
+  private static final String DB_DRIVER = "org.h2.Driver";
+  private static final String DB_URL = "jdbc:h2:~/test";
+  private static final String TABLE_NAME = "Test";
+  private static final String USER_NAME = "root";
+  private static final String PSW = "password";
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+          + ")";
+      stmt.executeUpdate(createMetaTable);
+
+      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+          + "(STRINGVALUE VARCHAR(255))";
+      stmt.executeUpdate(createTable);
+
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @After
+  public void cleanTable()
+  {
+    try {
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+      String dropTable = "drop table " + TABLE_NAME;
+      stmt.executeUpdate(dropTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void setConfig(Configuration conf)
+  {
+    conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
+  }
+
+  public int getNumOfEventsInStore()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+
+      String countQuery = "SELECT count(*) from " + TABLE_NAME;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  @Test
+  public void StreamingWordExtractTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    setConfig(conf);
+    StreamingWordExtract app = new StreamingWordExtract();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return getNumOfEventsInStore() == 36;
+      }
+    });
+
+    lc.run(10000);
+
+    Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
+    Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
new file mode 100644
index 0000000..fddf511
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TopWikipediaSessions} Application.
+ */
+public class TopWikipediaSessionsTest
+{
+  @Test
+  public void TopWikipediaSessionsTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true");
+    lma.prepareDAG(new TopWikipediaSessions(), conf);
+    LocalMode.Controller lc = lma.getController();
+
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
+      }
+    });
+
+    lc.run(30000);
+
+    for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
+      Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
+    }
+  }
+
+  public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
+  {
+    if (input.size() == 0 || input.size() == 1) {
+      return true;
+    }
+    for (int i = 0; i < input.size() - 2; i++) {
+      if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
new file mode 100644
index 0000000..766fa60
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Testing the {@link TrafficRoutes} Application.
+ */
+public class TrafficRoutesTest
+{
+
+  @Test
+  public void TrafficRoutesTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.TrafficRoutes.operator.console.silent", "true");
+    lma.prepareDAG(new TrafficRoutes(), conf);
+    LocalMode.Controller lc = lma.getController();
+
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return TrafficRoutes.InfoGen.getTupleCount() >= 100;
+      }
+    });
+
+    lc.run(60000);
+
+    Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
+    for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
+      Assert.assertTrue(entry.getValue().getKey() <= 75);
+      Assert.assertTrue(entry.getValue().getKey() >= 55);
+      Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
new file mode 100644
index 0000000..9ba2f25
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.complete;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
+ * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
+ * for the application before running it:
+ * Your application consumer key,
+ * Your application consumer secret,
+ * Your twitter access token, and
+ * Your twitter access token secret.
+ *
+ * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please
+ * set the time you need to run the application before you run.
+ */
+public class TwitterAutoCompleteTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class);
+
+  @Test
+  @Ignore
+  public void TwitterAutoCompleteTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    //uncomment the following lines and change YOUR_XXX to the corresponding information needed.
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN");
+    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET");
+    lma.prepareDAG(new TwitterAutoComplete(), conf);
+    LocalMode.Controller lc = lma.getController();
+    long start = System.currentTimeMillis();
+    lc.run(60000); // Set your desired time to run the application here.
+    long end = System.currentTimeMillis();
+    long time = end - start;
+    logger.info("Test used " + time + " ms");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
new file mode 100644
index 0000000..1e14fff
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for {@link CombinePerKeyExamples}.
+ */
+public class CombinePerKeyExamplesTest
+{
+  @Test
+  public void CombinePerKeyExamplesTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true");
+    CombinePerKeyExamples app = new CombinePerKeyExamples();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return CombinePerKeyExamples.Collector.isDone();
+      }
+    });
+    lc.run(100000);
+
+    Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
new file mode 100644
index 0000000..7f93f50
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+import com.datatorrent.stram.StramLocalCluster;
+
+
+/**
+ * Test for {@link DeDupExample}.
+ */
+public class DeDupExampleTest
+{
+  @Test
+  public void DeDupExampleTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.application.DeDupExample.operator.console.silent", "true");
+    DeDupExample app = new DeDupExample();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return DeDupExample.Collector.isDone();
+      }
+    });
+    lc.run(50000);
+
+    Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
new file mode 100644
index 0000000..ec28b40
--- /dev/null
+++ b/examples/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamplesTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.stram.StramLocalCluster;
+
+/**
+ * Test for MaxPerKeyExamples Application.
+ */
+public class MaxPerKeyExamplesTest
+{
+
+  private static final String INPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.InputPojo";
+  private static final String OUTPUT_TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.cookbook.OutputPojo";
+  private static final String DB_DRIVER = "org.h2.Driver";
+  private static final String DB_URL = "jdbc:h2:~/test";
+  private static final String INPUT_TABLE = "InputTable";
+  private static final String OUTPUT_TABLE = "OutputTable";
+  private static final String USER_NAME = "root";
+  private static final String PSW = "password";
+  private static final String QUERY = "SELECT * FROM " + INPUT_TABLE + ";";
+
+  private static final double[] MEANTEMPS = {85.3, 75.4};
+
+  @BeforeClass
+  public static void setup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+          + ")";
+      stmt.executeUpdate(createMetaTable);
+
+      String createInputTable = "CREATE TABLE IF NOT EXISTS " + INPUT_TABLE
+          + "(MONTH INT(2) not NULL, DAY INT(2), YEAR INT(4), MEANTEMP DOUBLE(10) )";
+      stmt.executeUpdate(createInputTable);
+
+      String createOutputTable = "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE
+          + "(MONTH INT(2) not NULL, MEANTEMP DOUBLE(10) )";
+      stmt.executeUpdate(createOutputTable);
+
+      String cleanTable = "truncate table " + INPUT_TABLE;
+      stmt.executeUpdate(cleanTable);
+
+      stmt = con.createStatement();
+
+      String sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 21, 2014, 85.3)";
+      stmt.executeUpdate(sql);
+      sql = "INSERT INTO " + INPUT_TABLE + " VALUES (7, 20, 2014, 75.4)";
+      stmt.executeUpdate(sql);
+      sql = "INSERT INTO " + INPUT_TABLE + " VALUES (6, 18, 2014, 45.3)";
+      stmt.executeUpdate(sql);
+
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @AfterClass
+  public static void cleanup()
+  {
+    try {
+      Class.forName(DB_DRIVER).newInstance();
+
+      Connection con = DriverManager.getConnection(DB_URL, USER_NAME, PSW);
+      Statement stmt = con.createStatement();
+
+      String dropInputTable = "DROP TABLE " + INPUT_TABLE;
+      stmt.executeUpdate(dropInputTable);
+
+      String dropOutputTable = "DROP TABLE " + OUTPUT_TABLE;
+      stmt.executeUpdate(dropOutputTable);
+
+    } catch (Throwable e) {
+      throw Throwables.propagate(e);
+    }
+
+  }
+
+  public void setConfig(Configuration conf)
+  {
+    conf.set("dt.operator.jdbcInput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcInput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcInput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcInput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS", INPUT_TUPLE_CLASS);
+    conf.set("dt.operator.jdbcInput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcInput.prop.tableName", INPUT_TABLE);
+    conf.set("dt.operator.jdbcInput.prop.query", QUERY);
+
+    conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
+    conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
+    conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
+    conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", OUTPUT_TUPLE_CLASS);
+    conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
+    conf.set("dt.operator.jdbcOutput.prop.tablename", OUTPUT_TABLE);
+  }
+
+  public int getNumEntries()
+  {
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+
+      String countQuery = "SELECT count(DISTINCT (MONTH, MEANTEMP)) from " + OUTPUT_TABLE;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  public Map<Integer, Double> getMaxMeanTemp()
+  {
+    Map<Integer, Double> result = new HashMap<>();
+    Connection con;
+    try {
+      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
+      Statement stmt = con.createStatement();
+
+      String countQuery = "SELECT DISTINCT * from " + OUTPUT_TABLE;
+      ResultSet resultSet = stmt.executeQuery(countQuery);
+      while (resultSet.next()) {
+        result.put(resultSet.getInt("MONTH"), resultSet.getDouble("MEANTEMP"));
+
+      }
+      return result;
+    } catch (SQLException e) {
+      throw new RuntimeException("fetching count", e);
+    }
+  }
+
+  @Test
+  public void MaxPerKeyExampleTest() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    setConfig(conf);
+
+    MaxPerKeyExamples app = new MaxPerKeyExamples();
+
+    lma.prepareDAG(app, conf);
+
+    LocalMode.Controller lc = lma.getController();
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return getNumEntries() == 2;
+      }
+    });
+
+    lc.run(5000);
+
+    double[] result = new double[2];
+    result[0] = getMaxMeanTemp().get(6);
+    result[1] = getMaxMeanTemp().get(7);
+    Assert.assertArrayEquals(MEANTEMPS, result, 0.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/data/word.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/data/word.txt b/examples/highlevelapi/src/test/resources/data/word.txt
new file mode 100644
index 0000000..7e28409
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/data/word.txt
@@ -0,0 +1,2 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/log4j.properties b/examples/highlevelapi/src/test/resources/log4j.properties
new file mode 100644
index 0000000..592eb19
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=INFO
+#log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+#log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/sampletweets.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/sampletweets.txt b/examples/highlevelapi/src/test/resources/sampletweets.txt
new file mode 100644
index 0000000..c113130
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/sampletweets.txt
@@ -0,0 +1,44 @@
+Last week, I published a blog announcing that Apex was accepted as an Apache incubator project.
+This week, I\u2019ll give you a little more detail on what Apache Apex is, and why it\u2019s important.
+
+Apache #Hadoop has been around for over a decade. It has become the de-facto big data platform,
+allowing enterprises to transform their business operations by turning big data into something useful, meaningful,
+and revenue-generating. #Hadoop promised the enablement of big data without incurring the costs you would normally think such powerful
+processing systems would demand. This tremendous promise of transforming business operations continues to fuel high growth in the industry.
+
+It all got started when Hadoop engineers at Yahoo! asked, \u201cHow can we build an efficient search indexing capability?\u201d
+The ensuing iterations and some inspiration resulted in the #MapReduce programming model. Although powerful, MapReduce wasn\u2019t perfect.
+
+Mastering MapReduce required a steep learning curve. Migrating applications to MapReduce required an almost complete rewrite.
+Equally worrisome was the fact that MapReduce had batch processing paradigm and \u201ccompute going to data\u201d at its core,
+thus posing a deterrent to Hadoop realizing its true potential.
+
+Expectedly enough, #MapReduce was an impediment that did little to bolster productization of big data.
+Not to be deterred, there were faster substitutes for MapReduce. Just like Hadoop, these models required deeper expertise, were tough to operate and difficult to master.
+As such, #Hadoop disrupted the way big data needs were handled, but remained largely under-productized.
+
+A decade after Hadoop was started, only a small percentage of big data projects are in production.
+Data is growing rapidly and the ability to harness big data has become a decisive competitive advantage.
+MapReduce impedes this demand (actually more of a scramble) to transform into a data-driven business.
+
+In hindsight, it is clear that in the early days, the subsequent success of Hadoop was not anticipated.
+If they had anticipated Hadoop\u2019s success, the question would have been, \u201cWhat can we do with massively distributed resources?\u201d
+The answer to this question, which came about soon after, was YARN (Hadoop 2.0), the next generation Hadoop.
+For the first time, #YARN brought the capability of exploring how distributed resources handling big data could perform \u201ca lot of things\u201d,
+thus going beyond the early MapReduce paradigm, and in a way beyond batch or even compute-going-to-data paradigms.
+YARN presented the capability to allow big data to not just become big in size, but broader in use cases. With its enabling capability as a Hadoop facilitator,
+YARN has pushed Hadoop towards realizing its true potential. The Hadoop predicament is similar to what
+cellphones would have been without the more popular features such as messaging and internet connectivity.
+
+
+In their early years, cellphones upset the landline market, but did not foster an immediate market furor till
+it transformed into the new-age \u201csmartphone\u201d with impressive features.
+YARN is most certainly the enabling factor for big data dreaming bigger and wider, and with it, Hadoop 2.0 is now a true de-facto distributed operating system.
+
+What\u2019s needed is bleeding edge YARN-based platforms capable of radically realizing Hadoop\u2019s potential
+
+Now is the right time to not only productize big data, but to see how setting it in motion can ensure realization of greater business goals.
+A Herculean task, this demands platforms that are easy to deploy, require nothing beyond everyday IT expertise, can effortlessly integrate with an existing IT infrastructure while ensuring ease of migration.
+The new-age Hadoop platforms need to be designed with an approach to reduce time-to-market by shortening the application lifecycle, from building to launching, thus quickening the realization of revenue for businesses.
+They will also have to reduce time for developers to develop, devOps to operationalize, and finally reduce time to insight for business.
+Platforms such as these will need to learn, adapt, and change to meet the burgeoning needs of the big data world.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/test/resources/wordcount/word.txt
----------------------------------------------------------------------
diff --git a/examples/highlevelapi/src/test/resources/wordcount/word.txt b/examples/highlevelapi/src/test/resources/wordcount/word.txt
new file mode 100644
index 0000000..edd0f51
--- /dev/null
+++ b/examples/highlevelapi/src/test/resources/wordcount/word.txt
@@ -0,0 +1,8 @@
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+word1 word2 word7 word9 word2 word3 word2 word4 word7 word3 word9 word9 word5 word5 word4 word2 word1 error
+bye
\ No newline at end of file