You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:22 UTC

[6/6] apex-malhar git commit: APEXMALHAR-2142 #comment Implement WindowedStream interface

APEXMALHAR-2142 #comment Implement WindowedStream interface

Add Accumulation interface support to High-Level API

Add name support for all the windowed transforms

Flex types in Composite transformation

Add more documents and logs


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/266b0411
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/266b0411
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/266b0411

Branch: refs/heads/master
Commit: 266b04116760dbd4d5cad6b4102b06153ac96a5f
Parents: 17f6c55
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jul 12 11:57:09 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Aug 25 09:26:03 2016 -0700

----------------------------------------------------------------------
 demos/highlevelapi/pom.xml                      |  55 ++
 demos/highlevelapi/src/assemble/appPackage.xml  |  59 ++
 .../sample/cookbook/CombinePerKeyExamples.java  | 237 ++++++++
 .../stream/sample/cookbook/TriggerExample.java  | 578 +++++++++++++++++++
 demos/pom.xml                                   |  12 +
 stream/pom.xml                                  |   1 -
 .../apex/malhar/stream/api/ApexStream.java      | 144 ++---
 .../stream/api/CompositeStreamTransform.java    |  30 +
 .../apache/apex/malhar/stream/api/Option.java   | 122 ++++
 .../apex/malhar/stream/api/WindowedStream.java  | 150 +++++
 .../malhar/stream/api/function/Function.java    |  36 +-
 .../malhar/stream/api/impl/ApexStreamImpl.java  | 194 +++----
 .../stream/api/impl/ApexWindowedStreamImpl.java | 275 +++++++++
 .../apex/malhar/stream/api/impl/DagMeta.java    |  53 +-
 .../malhar/stream/api/impl/IDGenerator.java     |   3 +
 .../malhar/stream/api/impl/StreamFactory.java   |  76 ++-
 .../stream/api/impl/TupleWrapperOperator.java   | 192 ++++++
 .../stream/api/impl/accumulation/Count.java     |  61 ++
 .../stream/api/impl/accumulation/FoldFn.java    |  65 +++
 .../stream/api/impl/accumulation/ReduceFn.java  |  65 +++
 .../stream/api/impl/accumulation/TopN.java      | 107 ++++
 .../api/operator/AnnonymousClassModifier.java   |   3 +
 .../api/operator/ByteArrayClassLoader.java      |   3 +
 .../stream/api/operator/FunctionOperator.java   | 112 ++--
 .../apex/malhar/stream/api/util/KeyedTuple.java |  34 --
 .../apex/malhar/stream/api/util/TupleUtil.java  |  21 +-
 .../FunctionOperator/FunctionOperatorTest.java  |   4 +-
 .../stream/api/impl/ApexStreamImplTest.java     |   6 +-
 .../stream/sample/ApplicationWithStreamAPI.java |  21 +-
 .../LocalTestWithoutStreamApplication.java      |  54 +-
 .../apex/malhar/stream/sample/MyStream.java     |   3 +-
 .../apex/malhar/stream/sample/MyStreamTest.java |  87 ++-
 .../malhar/stream/sample/TupleCollector.java    |   2 +
 .../apex/malhar/stream/sample/WCInput.java      |  90 +++
 .../stream/sample/WordCountWithStreamAPI.java   |  72 +++
 stream/src/test/resources/sampletweets.txt      | 207 +++++++
 36 files changed, 2839 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/pom.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/pom.xml b/demos/highlevelapi/pom.xml
new file mode 100644
index 0000000..c669681
--- /dev/null
+++ b/demos/highlevelapi/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>high-level-api-demo</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar High-Level API Demo</name>
+  <description>Apex demo applications that use High-level API to construct a dag</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-demos</artifactId>
+    <version>3.5.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>cglib</groupId>
+      <artifactId>cglib</artifactId>
+      <version>3.2.1</version>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+  </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/assemble/appPackage.xml b/demos/highlevelapi/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/demos/highlevelapi/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
new file mode 100644
index 0000000..5d4c628
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * An example that reads the public 'Shakespeare' data, and for each word in
+ * the dataset that is over a given length, generates a string containing the
+ * list of play names in which that word appears
+ *
+ * <p>Concepts: the combine transform, which lets you combine the values in a
+ * key-grouped Collection
+ *
+ */
+public class CombinePerKeyExamples
+{
+  // Use the shakespeare public BigQuery sample
+  private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
+  // We'll track words >= this word length across all plays in the table.
+  private static final int MIN_WORD_LENGTH = 9;
+
+  /**
+   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
+   * outputs word, play_name.
+   */
+  static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
+  {
+
+    @Override
+    public KeyValPair<String, String> f(SampleBean input)
+    {
+      String playName = input.getCorpus();
+      String word = input.getWord();
+      if (word.length() >= MIN_WORD_LENGTH) {
+        return new KeyValPair<>(word, playName);
+      } else {
+        return null;
+      }
+    }
+  }
+
+
+  /**
+   * Prepares the output data which is in same bean
+   */
+  static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
+  {
+    @Override
+    public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
+    {
+      return new SampleBean(input.getValue().getKey(), input.getValue().getValue(), null);
+    }
+  }
+
+  /**
+   * Reads the public 'Shakespeare' data, and for each word in the dataset
+   * over a given length, generates a string containing the list of play names
+   * in which that word appears.
+   */
+  static class PlaysForWord
+      extends CompositeStreamTransform<SampleBean, SampleBean>
+  {
+
+    @Override
+    public ApexStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
+    {
+      // fix this later
+      return inputStream.map(new ExtractLargeWordsFn())
+          .window(new WindowOption.GlobalWindow())
+          .reduceByKey(new ReduceFn<String>()
+          {
+            @Override
+            public String defaultAccumulatedValue()
+            {
+              return "";
+            }
+
+            @Override
+            public String accumulate(String accumulatedValue, String input)
+            {
+              return accumulatedValue + "," + input;
+            }
+
+            @Override
+            public String merge(String accumulatedValue1, String accumulatedValue2)
+            {
+              return accumulatedValue1 + "," + accumulatedValue2;
+            }
+
+            @Override
+            public String getOutput(String accumulatedValue)
+            {
+              return accumulatedValue;
+            }
+
+            @Override
+            public String getRetraction(String value)
+            {
+              return value;
+            }
+          }, new Function.MapFunction<KeyValPair<String, String>, Tuple<KeyValPair<String, String>>>()
+
+          {
+            @Override
+            public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
+            {
+              return null;
+            }
+          })
+          .map(new FormatShakespeareOutputFn());
+    }
+  }
+
+
+  public static class SampleBean
+  {
+
+    public SampleBean()
+    {
+
+    }
+
+    public SampleBean(String word, String all_plays, String corpus)
+    {
+      this.word = word;
+      this.all_plays = all_plays;
+      this.corpus = corpus;
+    }
+
+    private String word;
+
+    private String all_plays;
+
+    private String corpus;
+
+    public void setWord(String word)
+    {
+      this.word = word;
+    }
+
+    public String getWord()
+    {
+      return word;
+    }
+
+    public void setCorpus(String corpus)
+    {
+      this.corpus = corpus;
+    }
+
+    public String getCorpus()
+    {
+      return corpus;
+    }
+
+    public void setAll_plays(String all_plays)
+    {
+      this.all_plays = all_plays;
+    }
+
+    public String getAll_plays()
+    {
+      return all_plays;
+    }
+  }
+
+  public static class SampleInput implements InputOperator
+  {
+
+    public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
+
+    @Override
+    public void emitTuples()
+    {
+
+    }
+
+    @Override
+    public void beginWindow(long l)
+    {
+
+    }
+
+    @Override
+    public void endWindow()
+    {
+
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+
+    }
+
+    @Override
+    public void teardown()
+    {
+
+    }
+  }
+
+
+  public static void main(String[] args) throws Exception
+  {
+    SampleInput input = new SampleInput();
+    StreamFactory.fromInput(input, input.beanOutput).addCompositeStreams(new PlaysForWord());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
new file mode 100644
index 0000000..903f624
--- /dev/null
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample.cookbook;
+
+import java.util.Date;
+import java.util.Objects;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This example illustrates the basic concepts behind triggering. It shows how to use different
+ * trigger definitions to produce partial (speculative) results before all the data is processed and
+ * to control when updated results are produced for late data. The example performs a streaming
+ * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
+ * data into {@link Window windows} to be processed, and demonstrates using various kinds of
+ * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
+ * each window are emitted.
+ *
+ * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
+ * readings from sensor stations set up along each freeway. Each sensor reading includes a
+ * calculation of the 'total flow' across all lanes in that freeway direction.
+ *
+ * <p> Concepts:
+ * <pre>
+ *   1. The default triggering behavior
+ *   2. Late data with the default trigger
+ *   3. How to get speculative estimates
+ *   4. Combining late data and speculative estimates
+ * </pre>
+ *
+ * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
+ * and understand the concept of 'late data',
+ * See:  <a href="https://cloud.google.com/dataflow/model/triggers">
+ * https://cloud.google.com/dataflow/model/triggers </a> and
+ * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
+ * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
+ *
+ * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
+ * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
+ * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
+ * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
+ * pipeline also randomly simulates late data, by setting the timestamps of some of the data
+ * elements to be in the past. You may override the default {@code --input} with the file of your
+ * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
+ * you to use a separate tool to publish to the given topic.
+ *
+ * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
+ * from the example common package (there are no defaults for a general Dataflow pipeline).
+ * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
+ * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
+ * the example will try to create them.
+ *
+ * <p> The pipeline outputs its results to a BigQuery table.
+ * Here are some queries you can use to see interesting results:
+ * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
+ * Replace {@code <enter_window_interval>} in the query below with the window interval.
+ *
+ * <p> To see the results of the default trigger,
+ * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
+ * the window duration, until the first pane of non-late data has been emitted, to see more
+ * interesting results.
+ * {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
+ *
+ * <p> To see the late data i.e. dropped by the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
+ * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p>To see the the difference between accumulation mode and discarding mode,
+ * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
+ * (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
+ * window DESC, processing_time}
+ *
+ * <p> To see speculative results every minute,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
+ * ORDER BY window DESC, processing_time}
+ *
+ * <p> To see speculative results every five minutes after the end of the window
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
+ * and freeway = "5" ORDER BY window DESC, processing_time}
+ *
+ * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
+ * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
+ *
+ * <p> To reduce the number of results for each query we can add additional where clauses.
+ * For examples, To see the results of the default trigger,
+ * {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
+ * window = "<enter_window_interval>"}
+ *
+ * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
+ * and then exits.
+ */
+
+public class TriggerExample
+{
+  //Numeric value of fixed window duration, in minutes
+  public static final int WINDOW_DURATION = 30;
+  // Constants used in triggers.
+  // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
+  // ONE_MINUTE is used only with processing time before the end of the window
+  public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
+  // FIVE_MINUTES is used only with processing time after the end of the window
+  public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
+  // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
+  public static final Duration ONE_DAY = Duration.standardDays(1);
+
+  /**
+   * This transform demonstrates using triggers to control when data is produced for each window
+   * Consider an example to understand the results generated by each type of trigger.
+   * The example uses "freeway" as the key. Event time is the timestamp associated with the data
+   * element and processing time is the time when the data element gets processed in the pipeline.
+   * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
+   * Key (freeway) | Value (total_flow) | event time | processing time
+   * 5             | 50                 | 10:00:03   | 10:00:47
+   * 5             | 30                 | 10:01:00   | 10:01:03
+   * 5             | 30                 | 10:02:00   | 11:07:00
+   * 5             | 20                 | 10:04:10   | 10:05:15
+   * 5             | 60                 | 10:05:00   | 11:03:00
+   * 5             | 20                 | 10:05:01   | 11.07:30
+   * 5             | 60                 | 10:15:00   | 10:27:15
+   * 5             | 40                 | 10:26:40   | 10:26:43
+   * 5             | 60                 | 10:27:20   | 10:27:25
+   * 5             | 60                 | 10:29:00   | 11:11:00
+   *
+   * <p> Dataflow tracks a watermark which records up to what point in event time the data is
+   * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
+   * behind the current processing time. In practice, the actual value would vary over time based
+   * on the systems knowledge of the current PubSub delay and contents of the backlog (data
+   * that has not yet been processed).
+   *
+   * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
+   * close at 10:44:59, when the watermark passes 10:30:00.
+   */
+  static class CalculateTotalFlow
+      extends CompositeStreamTransform<String, SampleBean>
+  {
+    private int windowDuration;
+
+    CalculateTotalFlow(int windowDuration)
+    {
+      this.windowDuration = windowDuration;
+    }
+
+    @Override
+    public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+    {
+      // Concept #1: The default triggering behavior
+      // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
+      // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
+
+      // The system also defaults to dropping late data -- data which arrives after the watermark
+      // has passed the event timestamp of the arriving element. This means that the default trigger
+      // will only fire once.
+
+      // Each pane produced by the default trigger with no allowed lateness will be the first and
+      // last pane in the window, and will be ON_TIME.
+
+      // The results for the example above with the default trigger and zero allowed lateness
+      // would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // 5             | 260                | 6                 | true    | true   | ON_TIME
+
+      // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
+      // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
+      // late, and dropped.
+
+      ApexStream<SampleBean> defaultTriggerResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+          new TriggerOption().discardingFiredPanes())
+          .addCompositeStreams(new TotalFlow("default"));
+
+      // Concept #2: Late data with the default trigger
+      // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
+      // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
+      // window. Any late data will result in an additional pane being fired for that same window.
+
+      // The first pane produced will be ON_TIME and the remaining panes will be LATE.
+      // To definitely get the last pane when the window closes, use
+      // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
+
+      // The results for the example above with the default trigger and ONE_DAY allowed lateness
+      // would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // 5             | 260                | 6                 | true    | false  | ON_TIME
+      // 5             | 60                 | 1                 | false   | false  | LATE
+      // 5             | 30                 | 1                 | false   | false  | LATE
+      // 5             | 20                 | 1                 | false   | false  | LATE
+      // 5             | 60                 | 1                 | false   | false  | LATE
+      ApexStream<SampleBean> withAllowedLatenessResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+          new TriggerOption().discardingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("withAllowedLateness"));
+
+      // Concept #3: How to get speculative estimates
+      // We can specify a trigger that fires independent of the watermark, for instance after
+      // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
+      // all the data is available. Since we don't have any triggers that depend on the watermark
+      // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
+
+      // We also use accumulatingFiredPanes to build up the results across each pane firing.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // 5             | 80                 | 2                 | true    | false  | EARLY
+      // 5             | 100                | 3                 | false   | false  | EARLY
+      // 5             | 260                | 6                 | false   | false  | EARLY
+      // 5             | 320                | 7                 | false   | false  | LATE
+      // 5             | 370                | 9                 | false   | false  | LATE
+      // 5             | 430                | 10                | false   | false  | LATE
+
+      ApexStream<SampleBean> speculativeResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+              //Trigger fires every minute
+          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+                  // After emitting each pane, it will continue accumulating the elements so that each
+                  // approximation includes all of the previous data in addition to the newly arrived
+                  // data.
+          .accumulatingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("speculative"));
+
+      // Concept #4: Combining late data and speculative estimates
+      // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
+      // and LATE updates based on late data.
+
+      // Each time a triggering condition is satisfied it advances to the next trigger.
+      // If there are new elements this trigger emits a window under following condition:
+      // > Early approximations every minute till the end of the window.
+      // > An on-time firing when the watermark has passed the end of the window
+      // > Every five minutes of late data.
+
+      // Every pane produced will either be EARLY, ON_TIME or LATE.
+
+      // The results for the example above for this trigger would be:
+      // Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
+      // 5             | 80                 | 2                 | true    | false  | EARLY
+      // 5             | 100                | 3                 | false   | false  | EARLY
+      // 5             | 260                | 6                 | false   | false  | EARLY
+      // [First pane fired after the end of the window]
+      // 5             | 320                | 7                 | false   | false  | ON_TIME
+      // 5             | 430                | 10                | false   | false  | LATE
+
+      // For more possibilities of how to build advanced triggers, see {@link Trigger}.
+      ApexStream<SampleBean> sequentialResults = inputStream
+          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
+              // Speculative every ONE_MINUTE
+          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
+          .withLateFiringsAtEvery(Duration.standardMinutes(5))
+                  // After emitting each pane, it will continue accumulating the elements so that each
+                  // approximation includes all of the previous data in addition to the newly arrived
+                  // data.
+          .accumulatingFiredPanes(),
+          Duration.standardDays(1))
+          .addCompositeStreams(new TotalFlow("sequential"));
+
+      return sequentialResults;
+    }
+
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // The remaining parts of the pipeline are needed to produce the output for each
+  // concept above. Not directly relevant to understanding the trigger examples.
+
+  /**
+   * Calculate total flow and number of records for each freeway and format the results to TableRow
+   * objects, to save to BigQuery.
+   */
+  static class TotalFlow extends
+      CompositeStreamTransform<String, SampleBean>
+  {
+    private String triggerType;
+
+    public TotalFlow(String triggerType)
+    {
+      this.triggerType = triggerType;
+    }
+
+    @Override
+    public ApexStream<SampleBean> compose(ApexStream<String> inputStream)
+    {
+      if (!(inputStream instanceof WindowedStream)) {
+        throw new RuntimeException("Not supported here");
+      }
+      WindowedStream<String> windowedStream = (WindowedStream<String>)inputStream;
+      ApexStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = windowedStream
+          .groupByKey(new ExtractFlowInfo());
+
+      return flowPerFreeway
+          .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
+          {
+            @Override
+            public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
+            {
+              Iterable<Integer> flows = input.getValue();
+              Integer sum = 0;
+              Long numberOfRecords = 0L;
+              for (Integer value : flows) {
+                sum += value;
+                numberOfRecords++;
+              }
+              return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
+            }
+          })
+          .map(new FormatTotalFlow(triggerType));
+    }
+  }
+
+  /**
+   * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
+   * Adds the triggerType, pane information, processing time and the window timestamp.
+   */
+  static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
+  {
+    private String triggerType;
+
+    public FormatTotalFlow(String triggerType)
+    {
+      this.triggerType = triggerType;
+    }
+
+    @Override
+    public SampleBean f(KeyValPair<String, String> input)
+    {
+      String[] values = input.getValue().split(",");
+      //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
+      return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
+          .parseLong(values[1]), null, false, false, null, null, new Date());
+    }
+  }
+
+  public static class SampleBean
+  {
+    public SampleBean()
+    {
+    }
+
+    private String trigger_type;
+
+    private String freeway;
+
+    private int total_flow;
+
+    private long number_of_records;
+
+    private String window;
+
+    private boolean isFirst;
+
+    private boolean isLast;
+
+    private Date timing;
+
+    private Date event_time;
+
+    private Date processing_time;
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SampleBean that = (SampleBean)o;
+      return total_flow == that.total_flow &&
+          number_of_records == that.number_of_records &&
+          isFirst == that.isFirst &&
+          isLast == that.isLast &&
+          Objects.equals(trigger_type, that.trigger_type) &&
+          Objects.equals(freeway, that.freeway) &&
+          Objects.equals(window, that.window) &&
+          Objects.equals(timing, that.timing) &&
+          Objects.equals(event_time, that.event_time) &&
+          Objects.equals(processing_time, that.processing_time);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects
+          .hash(trigger_type, freeway, total_flow, number_of_records, window, isFirst, isLast, timing, event_time,
+              processing_time);
+    }
+
+    public SampleBean(String trigger_type, String freeway, int total_flow, long number_of_records, String window,
+        boolean isFirst, boolean isLast, Date timing, Date event_time, Date processing_time)
+    {
+
+      this.trigger_type = trigger_type;
+      this.freeway = freeway;
+      this.total_flow = total_flow;
+      this.number_of_records = number_of_records;
+      this.window = window;
+      this.isFirst = isFirst;
+      this.isLast = isLast;
+      this.timing = timing;
+      this.event_time = event_time;
+      this.processing_time = processing_time;
+    }
+
+    public String getTrigger_type()
+    {
+      return trigger_type;
+    }
+
+    public void setTrigger_type(String trigger_type)
+    {
+      this.trigger_type = trigger_type;
+    }
+
+    public String getFreeway()
+    {
+      return freeway;
+    }
+
+    public void setFreeway(String freeway)
+    {
+      this.freeway = freeway;
+    }
+
+    public int getTotal_flow()
+    {
+      return total_flow;
+    }
+
+    public void setTotal_flow(int total_flow)
+    {
+      this.total_flow = total_flow;
+    }
+
+    public long getNumber_of_records()
+    {
+      return number_of_records;
+    }
+
+    public void setNumber_of_records(long number_of_records)
+    {
+      this.number_of_records = number_of_records;
+    }
+
+    public String getWindow()
+    {
+      return window;
+    }
+
+    public void setWindow(String window)
+    {
+      this.window = window;
+    }
+
+    public boolean isFirst()
+    {
+      return isFirst;
+    }
+
+    public void setFirst(boolean first)
+    {
+      isFirst = first;
+    }
+
+    public boolean isLast()
+    {
+      return isLast;
+    }
+
+    public void setLast(boolean last)
+    {
+      isLast = last;
+    }
+
+    public Date getTiming()
+    {
+      return timing;
+    }
+
+    public void setTiming(Date timing)
+    {
+      this.timing = timing;
+    }
+
+    public Date getEvent_time()
+    {
+      return event_time;
+    }
+
+    public void setEvent_time(Date event_time)
+    {
+      this.event_time = event_time;
+    }
+
+    public Date getProcessing_time()
+    {
+      return processing_time;
+    }
+
+    public void setProcessing_time(Date processing_time)
+    {
+      this.processing_time = processing_time;
+    }
+  }
+
+  /**
+   * Extract the freeway and total flow in a reading.
+   * Freeway is used as key since we are calculating the total flow for each freeway.
+   */
+  static class ExtractFlowInfo implements Function.MapFunction<String, KeyValPair<String, Integer>>
+  {
+    @Override
+    public KeyValPair<String, Integer> f(String input)
+    {
+      String[] laneInfo = input.split(",");
+      if (laneInfo[0].equals("timestamp")) {
+        // Header row
+        return null;
+      }
+      if (laneInfo.length < 48) {
+        //Skip the invalid input.
+        return null;
+      }
+      String freeway = laneInfo[2];
+      Integer totalFlow = tryIntegerParse(laneInfo[7]);
+      // Ignore the records with total flow 0 to easily understand the working of triggers.
+      // Skip the records with total flow -1 since they are invalid input.
+      if (totalFlow == null || totalFlow <= 0) {
+        return null;
+      }
+      return new KeyValPair<>(freeway, totalFlow);
+    }
+  }
+
+  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
+
+  public static void main(String[] args) throws Exception
+  {
+    StreamFactory.fromFolder("some folder")
+        .addCompositeStreams(new CalculateTotalFlow(60));
+
+  }
+
+  private static Integer tryIntegerParse(String number)
+  {
+    try {
+      return Integer.parseInt(number);
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/demos/pom.xml
----------------------------------------------------------------------
diff --git a/demos/pom.xml b/demos/pom.xml
index 12f0f14..e9f2daf 100644
--- a/demos/pom.xml
+++ b/demos/pom.xml
@@ -192,6 +192,7 @@
     <module>r</module>
     <module>echoserver</module>
     <module>iteration</module>
+    <module>highlevelapi</module>
   </modules>
 
   <dependencies>
@@ -231,6 +232,17 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>malhar-stream</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/pom.xml
----------------------------------------------------------------------
diff --git a/stream/pom.xml b/stream/pom.xml
index fd663e0..445be92 100755
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -95,6 +95,5 @@
       <version>3.2.1</version>
     </dependency>
 
-
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
index 2f65ba9..6d44534 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -18,11 +18,14 @@
  */
 package org.apache.apex.malhar.stream.api;
 
-
-import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context.DAGContext;
@@ -37,6 +40,7 @@ import com.datatorrent.api.Operator;
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public interface ApexStream<T>
 {
   /**
@@ -46,17 +50,7 @@ public interface ApexStream<T>
    * @param <O> Type of the output
    * @return new stream of type O
    */
-  <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction);
-
-  /**
-   * Simple map transformation<br>
-   * Add an operator to the DAG which convert tuple T to tuple O
-   * @param name operator name
-   * @param mapFunction map function
-   * @param <O> Type of the output
-   * @return new stream of type O
-   */
-  <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mapFunction);
+  <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction, Option... opts);
 
   /**
    * Flat map transformation
@@ -65,17 +59,7 @@ public interface ApexStream<T>
    * @param <O> Type of the output
    * @return new stream of type O
    */
-  <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten);
-
-  /**
-   * Flat map transformation<br>
-   * Add an operator to the DAG which convert tuple T to a collection of tuple O
-   * @param name operator name
-   * @param flatten
-   * @param <O> Type of the output
-   * @return new stream of type O
-   */
-  <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten);
+  <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten, Option... opts);
 
   /**
    * Filter transformation<br>
@@ -83,76 +67,7 @@ public interface ApexStream<T>
    * @param filter filter function
    * @return new stream of same type
    */
-  <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter);
-
-  /**
-   * Filter transformation<br>
-   * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
-   * @param name operator name
-   * @param filter filter function
-   * @return new stream of same type
-   */
-  <STREAM extends ApexStream<T>> STREAM filter(String name, Function.FilterFunction<T> filter);
-
-  /**
-   * Reduce transformation<br>
-   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
-   * @param reduce reduce function
-   * @return new stream of same type
-   */
-  <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce);
-
-  /**
-   * Reduce transformation<br>
-   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
-   * @param name operator name
-   * @param reduce reduce function
-   * @return new stream of same type
-   */
-  <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce);
-
-  /**
-   * Fold transformation<br>
-   * Add an operator to the DAG which merge tuple T to accumulated result tuple O
-   * @param initialValue initial result value
-   * @param fold fold function
-   * @param <O> Result type
-   * @return new stream of type O
-   */
-  <O, STREAM extends ApexStream<O>> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold);
-
-  /**
-   * Fold transformation<br>
-   * Add an operator to the DAG which merge tuple T to accumulated result tuple O
-   * @param name name of the operator
-   * @param initialValue initial result value
-   * @param fold fold function
-   * @param <O> Result type
-   * @return new stream of type O
-   */
-  <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold);
-
-  /**
-   * Count of all tuples
-   * @return new stream of Integer
-   */
-  <STREAM extends ApexStream<Integer>> STREAM count();
-
-  /**
-   * Count tuples by the key<br>
-   * If the input is KeyedTuple it will get the key from getKey method from the tuple<br>
-   * If not, use the tuple itself as a key
-   * @return new stream of Map
-   */
-  <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey();
-
-  /**
-   *
-   * Count tuples by the indexed key
-   * @param key the index of the field in the tuple that are used as key
-   * @return new stream of Map
-   */
-  <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key);
+  <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter, Option... opts);
 
   /**
    * Extend the dag by adding one operator<br>
@@ -162,18 +77,23 @@ public interface ApexStream<T>
    * @param <O> type of the output
    * @return new stream of type O
    */
-  <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort,  Operator.OutputPort<O> outputPort);
+  <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort,  Operator.OutputPort<O> outputPort, Option... opts);
 
   /**
-   * Extend the dag by adding one {@see Operator}
-   * @param opName Operator name
+   * Extend the dag by adding one end operator<br>
    * @param op Operator added to the stream
    * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
-   * @param outputPort OutputPort of the operator will be connected to next operator
    * @param <O> type of the output
    * @return new stream of type O
    */
-  <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort,  Operator.OutputPort<O> outputPort);
+  <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts);
+
+  /**
+   * Extend the dag by adding one {@see CompositeStreamTransform}
+   * @param compositeStreamTransform Composite Streams and Transforms
+   * @return new stream of type O
+   */
+  <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform);
 
   /**
    * Union multiple stream into one
@@ -260,4 +180,30 @@ public interface ApexStream<T>
    */
   void run();
 
+  /**
+   * Chunk tuples into Windows
+   * Window Transform are defined in {@see WindowedStream}
+   * @param windowOption
+   * @return
+   */
+  WindowedStream<T> window(WindowOption windowOption);
+
+  /**
+   * Chunk tuple into windows with window option and trigger option
+   * @param windowOption
+   * @param triggerOption
+   * @return
+   */
+  WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption);
+
+  /**
+   *
+   * Chunk tuple into windows with window option and trigger option and allowed lateness
+   * @param windowOption
+   * @param triggerOption
+   * @param allowLateness
+   * @return
+   */
+  WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness);
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
new file mode 100644
index 0000000..979f44f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/CompositeStreamTransform.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A group of Streams and transforms in between
+ */
+@InterfaceStability.Evolving
+public abstract class CompositeStreamTransform<INSTREAM extends ApexStream, OUTSTREAM extends ApexStream>
+{
+  public abstract OUTSTREAM compose(INSTREAM inputStream);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
new file mode 100644
index 0000000..1b8935f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Attribute;
+
+/**
+ * Options for the operators in the dag
+ */
+@InterfaceStability.Evolving
+public interface Option
+{
+
+  class Options
+  {
+    public static Option name(String name)
+    {
+      return new OpName(name);
+    }
+
+    public static Option prop(String name, Object value)
+    {
+      return new PropSetting(name, value);
+    }
+
+    public static <T> Option attr(Attribute<T> attr, T obj)
+    {
+      return new AttributeSetting<>(attr, obj);
+    }
+  }
+
+  /**
+   * An option used to set the name of the operator
+   */
+  class OpName implements Option
+  {
+
+    private String name;
+
+    public OpName(String name)
+    {
+      this.name = name;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+  }
+
+  /**
+   * An option used to set the property value of the operator
+   */
+  class PropSetting implements Option
+  {
+
+    private String name;
+
+    private Object val;
+
+    public PropSetting(String name, Object val)
+    {
+      this.name = name;
+      this.val = val;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public Object getVal()
+    {
+      return val;
+    }
+  }
+
+  /**
+   * An option used to set the {@link Attribute}
+   * @param <T>
+   */
+  class AttributeSetting<T> implements Option
+  {
+    private Attribute<T> attr;
+
+    private T value;
+
+    public AttributeSetting(Attribute<T> attr, T value)
+    {
+      this.attr = attr;
+      this.value = value;
+    }
+
+    public Attribute<T> getAttr()
+    {
+      return attr;
+    }
+
+    public T getValue()
+    {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index 748a76a..bc99035 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -18,6 +18,22 @@
  */
 package org.apache.apex.malhar.stream.api;
 
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * <p>
  * A stream with windowed transformation
@@ -51,6 +67,140 @@ package org.apache.apex.malhar.stream.api;
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public interface WindowedStream<T> extends ApexStream<T>
 {
+
+  /**
+   * Count of all tuples
+   * @return new stream of Integer
+   */
+  <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts);
+
+  /**
+   * Count tuples by the key<br>
+   * @param name name of the operator
+   * @param convertToKeyValue The function convert plain tuple to k,v pair
+   * @return new stream of Key Value Pair
+   */
+  <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts);
+
+  /**
+   * Return top N tuples by the selected key
+   * @param N how many tuples you want to keep
+   * @param name name of the operator
+   * @param convertToKeyVal The function convert plain tuple to k,v pair
+   * @return new stream of Key and top N tuple of the key
+   */
+  <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+  /**
+   * Return top N tuples of all tuples in the window
+   * @param N
+   * @param name name of the operator
+   * @return new stream of topN
+   */
+  <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts);
+
+  /**
+   * Add {@link KeyedWindowedOperatorImpl} with specified {@link Accumulation} <br>
+   * Accumulate tuples by some key within the window definition in this stream
+   * Also give a name to the accumulation
+   * @param accumulation Accumulation function you want to do
+   * @param convertToKeyVal The function convert plain tuple to k,v pair
+   * @param <K> The type of the key used to group tuples
+   * @param <V> The type of value you want to do accumulation on
+   * @param <O> The output type for each given key that you want to accumulate the value to
+   * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk)
+   * @param <STREAM> return type
+   * @return
+   */
+  <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation,
+      Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+  /**
+   * Add {@link WindowedOperatorImpl} with specified {@link Accumulation} <br>
+   * Accumulate tuples by some key within the window definition in this stream
+   * Also give a name to the accumulation
+   * @param accumulation Accumulation function you want to do
+   * @param <O> The output type that you want to accumulate the value to
+   * @param <ACCU> The type of accumulation you want to keep (it can be in memory or on disk)
+   * @param <STREAM> return type
+   * @return
+   */
+  <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts);
+
+  /**
+   * Add {@link WindowedOperatorImpl} with specified {@link ReduceFn} <br>
+   * Do reduce transformation<br>
+   * @param reduce reduce function
+   * @param <STREAM> return type
+   * @return new stream of same type
+   */
+  <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts);
+
+  /**
+   * Add {@link KeyedWindowedOperatorImpl} with specified {@link ReduceFn} <br>
+   * Reduce transformation by selected key <br>
+   * Add an operator to the DAG which merge tuple t1, t2 to new tuple by key
+   * @param reduce reduce function
+   * @param convertToKeyVal The function convert plain tuple to k,v pair
+   * @param <K> The type of key you want to group tuples by
+   * @param <V> The type of value extract from tuple T
+   * @param <STREAM> return type
+   * @return new stream of key value pair
+   */
+  <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+
+  /**
+   * Add {@link WindowedOperatorImpl} with specified {@link FoldFn} <br>
+   * Fold transformation <br>
+   * @param fold fold function
+   * @param <O> output type of fold function
+   * @param <STREAM> return type
+   * @return
+   */
+  <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts);
+
+  /**
+   * Add {@link KeyedWindowedOperatorImpl} with specified {@link FoldFn} <br>
+   * Fold transformation by key <br>
+   * @param fold fold function
+   * @param <O> Result type
+   * @return new stream of type O
+   */
+  <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts);
+
+
+  /**
+   * Return tuples for each key for each window
+   * @param <O>
+   * @param <K>
+   * @param <STREAM>
+   * @return
+   */
+  <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts);
+
+  /**
+   * Return tuples for each window
+   * @param <STREAM>
+   * @return
+   */
+  <STREAM extends WindowedStream<Iterable<T>>> STREAM group();
+
+  /**
+   * Reset the trigger settings for next transforms
+   * @param triggerOption
+   * @param <STREAM>
+   */
+  <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption triggerOption);
+
+  /**
+   * Reset the allowedLateness settings for next transforms
+   * @param allowedLateness
+   * @param <STREAM>
+   */
+  <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness);
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
index f4e5e60..d516064 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
@@ -18,11 +18,24 @@
  */
 package org.apache.apex.malhar.stream.api.function;
 
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
- * The top level function interface
+ * The top level function interface <br>
+ * The function is wrapped by {@link FunctionOperator} <br>
+ * It takes input from input port of {@link FunctionOperator} ex. {@link FunctionOperator.MapFunctionOperator#input} <br>
+ * And the output will be emitted using {@link FunctionOperator#tupleOutput} <br>
+ * Anonymous function is not fully supported. It must be <b>stateless</b> should not be defined in any static context<br>
+ * If anonymous function does not working, you can should use top level function class<br>
+ * Top level function class should have public non-arg constructor
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public interface Function
 {
   /**
@@ -45,26 +58,18 @@ public interface Function
   }
 
   /**
-   * An interface defines a reduce transformation
+   * A special map function to convert any pojo to key value pair datastructure
    * @param <T>
+   * @param <K>
+   * @param <V>
    */
-  public static interface ReduceFunction<T> extends Function
+  public static interface ToKeyValue<T, K, V> extends MapFunction<T, Tuple<KeyValPair<K, V>>>
   {
-    T reduce(T t1, T t2);
-  }
 
-  /**
-   * An interface that defines a fold transformation
-   * @param <I>
-   * @param <O>
-   */
-  public static interface FoldFunction<I, O> extends Function
-  {
-    O fold(I input, O output);
   }
 
   /**
-   * An interface that defines flatmap transforation
+   * An interface that defines flatmap transformation
    * @param <I>
    * @param <O>
    */
@@ -76,7 +81,8 @@ public interface Function
    * An interface that defines filter transformation
    * @param <T>
    */
-  public static interface FilterFunction<T> extends MapFunction<T, Boolean>
+  public static interface FilterFunction<T> extends Function
   {
+    boolean f(T input);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
index 2ff6d51..032cb03 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -27,28 +27,39 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.WindowOption;
+
 import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.WindowedStream;
 import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.function.Function.FlatMapFunction;
 import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
-import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 
 /**
- * Default stream implementation for ApexStream interface.
- * It creates the dag(execution plan) from stream api
+ * Default stream implementation for ApexStream interface. <br>
+ * It creates the dag(execution plan) from stream api <br>
+ * The dag won't be constructed until {@link #populateDag(DAG)} is called
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class ApexStreamImpl<T> implements ApexStream<T>
 {
 
@@ -135,18 +146,17 @@ public class ApexStreamImpl<T> implements ApexStream<T>
     }
   }
 
+
   /**
    * Graph behind the stream
    */
-  private DagMeta graph;
-
-  private ApexStream<T> delegator;
+  protected DagMeta graph;
 
   /**
-   * Right now the stream only support single extend point
-   * You can have multiple downstream operators connect to this single extend point though
+   * Right now the stream only support single extension point
+   * You can have multiple downstream operators connect to this single extension point though
    */
-  private Brick<T> lastBrick;
+  protected Brick<T> lastBrick;
 
   public Brick<T> getLastBrick()
   {
@@ -163,13 +173,11 @@ public class ApexStreamImpl<T> implements ApexStream<T>
     graph = new DagMeta();
   }
 
-  public ApexStreamImpl(ApexStream<T> apexStream)
+  public ApexStreamImpl(ApexStreamImpl<T> apexStream)
   {
-    this.delegator = apexStream;
-    if (delegator != null && delegator instanceof ApexStreamImpl) {
-      graph = ((ApexStreamImpl)delegator).graph;
-      lastBrick = ((ApexStreamImpl<T>)delegator).lastBrick;
-    }
+    //copy the variables over to the new ApexStreamImpl
+    graph = apexStream.graph;
+    lastBrick = apexStream.lastBrick;
   }
 
   public ApexStreamImpl(DagMeta graph)
@@ -184,128 +192,52 @@ public class ApexStreamImpl<T> implements ApexStream<T>
   }
 
   @Override
-  public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf)
-  {
-    return map(mf.toString(), mf);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mf)
+  public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf, Option... opts)
   {
     FunctionOperator.MapFunctionOperator<T, O> opt = new FunctionOperator.MapFunctionOperator<>(mf);
-    return (STREAM)addOperator(name, opt, opt.input, opt.output);
+    return addOperator(opt, opt.input, opt.output, opts);
   }
 
-  @Override
-  public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten)
-  {
-    return flatMap(flatten.toString(), flatten);
-  }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten)
+  public <O, STREAM extends ApexStream<O>> STREAM flatMap(FlatMapFunction<T, O> flatten, Option... opts)
   {
     FunctionOperator.FlatMapFunctionOperator<T, O> opt = new FunctionOperator.FlatMapFunctionOperator<>(flatten);
-    return (STREAM)addOperator(name, opt, opt.input, opt.output);
-  }
-
-  @Override
-  public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter)
-  {
-    return filter(filter.toString(), filter);
+    return addOperator(opt, opt.input, opt.output, opts);
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  public <STREAM extends ApexStream<T>> STREAM filter(String name, final Function.FilterFunction<T> filter)
+  public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter, Option... opts)
   {
     FunctionOperator.FilterFunctionOperator<T> filterFunctionOperator = new FunctionOperator.FilterFunctionOperator<>(filter);
-    return (STREAM)addOperator(name, filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output);
-  }
-
-  @Override
-  public <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce)
-  {
-    return reduce(reduce.toString(), reduce);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce)
-  {
-    FunctionOperator.ReduceFunctionOperator<T> opt = new FunctionOperator.ReduceFunctionOperator<>(reduce);
-    return (STREAM)addOperator(name, opt, opt.input, opt.output);
-  }
-
-  @Override
-  public <O, STREAM extends ApexStream<O>> STREAM fold(final O initialValue, Function.FoldFunction<T, O> fold)
-  {
-    return fold(fold.toString(), initialValue, fold);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold)
-  {
-    FunctionOperator.FoldFunctionOperator<T, O> opt = new FunctionOperator.FoldFunctionOperator<>(fold, initialValue);
-    return (STREAM)addOperator(name, opt, opt.input, opt.output);
-  }
-
-  @Override
-  public <STREAM extends ApexStream<Integer>> STREAM count()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key)
-  {
-    throw new UnsupportedOperationException();
+    return addOperator(filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output, opts);
   }
 
-  @Override
-  @SuppressWarnings("unchecked")
-  public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey()
+  public <STREAM extends ApexStream<Map.Entry<Object, Integer>>> STREAM countByElement()
   {
-    // Needs to change the unique counter to support keys
-    UniqueCounter<Object> uniqueCounter = new UniqueCounter<>();
-    uniqueCounter.setCumulative(true);
-    Operator.OutputPort<? extends Map<Object, Integer>> resultPort = uniqueCounter.count;
-    return (STREAM)addOperator("CounterByKey", uniqueCounter, (Operator.InputPort<T>)uniqueCounter.data, resultPort);
+    return null;
   }
 
   @Override
-  public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+  public <O, STREAM extends ApexStream<O>> STREAM endWith(Operator op, Operator.InputPort<T> inputPort, Option... opts)
   {
-    return addOperator(op.toString(), op, inputPort, outputPort);
+    return (STREAM)addOperator(op, inputPort, null, opts);
   }
 
-
   @Override
   @SuppressWarnings("unchecked")
-  public <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+  public  <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts)
   {
 
-    if (delegator != null) {
-      ApexStreamImpl<O> apexStream = delegator.addOperator(opName, op, inputPort, outputPort);
-      try {
-        return (STREAM)this.getClass().getConstructor(ApexStream.class).newInstance(apexStream);
-      } catch (Exception e) {
-        throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as delegator");
-      }
-    }
-
     checkArguments(op, inputPort, outputPort);
 
     DagMeta.NodeMeta nm = null;
 
     if (lastBrick == null) {
-      nm = graph.addNode(opName, op, null, null, inputPort);
+      nm = graph.addNode(op, null, null, inputPort, opts);
     } else {
-
-      nm = graph.addNode(opName, op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort);
+      nm = graph.addNode(op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort, opts);
     }
 
     Brick<O> newBrick = new Brick<>();
@@ -315,9 +247,25 @@ public class ApexStreamImpl<T> implements ApexStream<T>
       newBrick.lastStream = Pair.<Operator.OutputPort, Operator.InputPort>of(lastBrick.lastOutput, inputPort);
     }
 
-    return (STREAM)new ApexStreamImpl<>(this.graph, newBrick);
+    if (this.getClass() == ApexStreamImpl.class || this.getClass() == ApexWindowedStreamImpl.class) {
+      return (STREAM)newStream(this.graph, newBrick);
+    } else {
+      try {
+        return (STREAM)this.getClass().getConstructor(ApexStreamImpl.class).newInstance(newStream(this.graph, newBrick));
+      } catch (Exception e) {
+        throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as default parameter", e);
+      }
+    }
+
+  }
+
+  @Override
+  public <O, INSTREAM extends ApexStream<T>, OUTSTREAM extends ApexStream<O>> OUTSTREAM addCompositeStreams(CompositeStreamTransform<INSTREAM, OUTSTREAM> compositeStreamTransform)
+  {
+    return compositeStreamTransform.compose((INSTREAM)this);
   }
 
+
   /* Check to see if inputPort and outputPort belongs to the operator */
   private void checkArguments(Operator op, Operator.InputPort inputPort, Operator.OutputPort outputPort)
   {
@@ -362,8 +310,8 @@ public class ApexStreamImpl<T> implements ApexStream<T>
   public ApexStreamImpl<T> print()
   {
     ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
-    addOperator(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()), consoleOutputOperator,
-        (Operator.InputPort<T>)consoleOutputOperator.input, null);
+    addOperator(consoleOutputOperator,
+        (Operator.InputPort<T>)consoleOutputOperator.input, null, Option.Options.name(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass())));
     return this;
   }
 
@@ -469,6 +417,7 @@ public class ApexStreamImpl<T> implements ApexStream<T>
   {
     LocalMode lma = LocalMode.newInstance();
     populateDag(lma.getDAG());
+    DAG dag = lma.getDAG();
     LocalMode.Controller lc = lma.getController();
     if (lc instanceof StramLocalCluster) {
       ((StramLocalCluster)lc).setExitCondition(exitCondition);
@@ -493,5 +442,36 @@ public class ApexStreamImpl<T> implements ApexStream<T>
     //TODO need an api to submit the StreamingApplication to cluster
   }
 
+  @Override
+  public WindowedStream<T> window(WindowOption option)
+  {
+    return window(option, null, null);
+  }
+
+  @Override
+  public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption)
+  {
+    return window(windowOption, triggerOption, null);
+  }
+
+  @Override
+  public WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness)
+  {
+    ApexWindowedStreamImpl<T> windowedStream = new ApexWindowedStreamImpl<>();
+    windowedStream.lastBrick = lastBrick;
+    windowedStream.graph = graph;
+    windowedStream.windowOption = windowOption;
+    windowedStream.triggerOption = triggerOption;
+    windowedStream.allowedLateness = allowLateness;
+    return windowedStream;
+  }
+
+  protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick)
+  {
+    ApexStreamImpl<O> newstream = new ApexStreamImpl<>();
+    newstream.graph = graph;
+    newstream.lastBrick = newBrick;
+    return newstream;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
new file mode 100644
index 0000000..a293ea8
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
+import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
+import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
+import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.apex.malhar.stream.api.WindowedStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+
+import org.apache.apex.malhar.stream.api.impl.accumulation.Count;
+import org.apache.apex.malhar.stream.api.impl.accumulation.FoldFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.ReduceFn;
+import org.apache.apex.malhar.stream.api.impl.accumulation.TopN;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Default windowed stream implementation for WindowedStream interface.
+ * It adds more windowed transform for Stream interface
+ *
+ * @since 3.4.0
+ */
+@InterfaceStability.Evolving
+public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements WindowedStream<T>
+{
+
+  protected WindowOption windowOption;
+
+  protected TriggerOption triggerOption;
+
+  protected Duration allowedLateness;
+
+  private class ConvertFn<T> implements Function.MapFunction<T, Tuple<T>>
+  {
+
+    @Override
+    public Tuple<T> f(T input)
+    {
+      if (input instanceof Tuple.TimestampedTuple) {
+        return (Tuple.TimestampedTuple)input;
+      } else {
+        return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), input);
+      }
+    }
+  }
+
+
+  public ApexWindowedStreamImpl()
+  {
+  }
+
+  @Override
+  public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts)
+  {
+    Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>()
+    {
+      @Override
+      public Tuple<Long> f(T input)
+      {
+        if (input instanceof Tuple.TimestampedTuple) {
+          return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L);
+        } else {
+          return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L);
+        }
+      }
+    };
+
+    WindowedStream<Tuple<Long>> innerstream = map(kVMap);
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count());
+    return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+  }
+
+  @Override
+  public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts)
+  {
+    WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue);
+    KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count());
+    return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+  }
+
+  @Override
+  public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, List<V>>>>> STREAM topByKey(int N, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+  {
+    TopN<V> top = new TopN<>();
+    top.setN(N);
+    WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+    KeyedWindowedOperatorImpl<K, V, List<V>, List<V>> keyedWindowedOperator = createKeyedWindowedOperator(top);
+    return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+  }
+
+  @Override
+  public <STREAM extends WindowedStream<Tuple.WindowedTuple<List<T>>>> STREAM top(int N, Option... opts)
+  {
+
+    TopN<T> top = new TopN<>();
+    top.setN(N);
+    WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+    WindowedOperatorImpl<T, List<T>, List<T>> windowedOperator = createWindowedOperator(top);
+    return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+  }
+
+
+  @Override
+  public <K, V, O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM accumulateByKey(Accumulation<V, ACCU, O> accumulation,
+      Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+  {
+    WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+    KeyedWindowedOperatorImpl<K, V, ACCU, O> keyedWindowedOperator = createKeyedWindowedOperator(accumulation);
+    return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+  }
+
+
+  @Override
+  public <O, ACCU, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM accumulate(Accumulation<T, ACCU, O> accumulation, Option... opts)
+  {
+    WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+    WindowedOperatorImpl<T, ACCU, O> windowedOperator = createWindowedOperator(accumulation);
+    return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+  }
+
+
+  @Override
+  public <STREAM extends WindowedStream<Tuple.WindowedTuple<T>>> STREAM reduce(ReduceFn<T> reduce, Option... opts)
+  {
+    WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+    WindowedOperatorImpl<T, T, T> windowedOperator = createWindowedOperator(reduce);
+    return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+  }
+
+  @Override
+  public <K, V, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, V>>>> STREAM reduceByKey(ReduceFn<V> reduce, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+  {
+    WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+    KeyedWindowedOperatorImpl<K, V, V, V> keyedWindowedOperator = createKeyedWindowedOperator(reduce);
+    return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+  }
+
+  @Override
+  public <O, STREAM extends WindowedStream<Tuple.WindowedTuple<O>>> STREAM fold(FoldFn<T, O> fold, Option... opts)
+  {
+    WindowedStream<Tuple<T>> innerstream = map(new ConvertFn<T>());
+    WindowedOperatorImpl<T, O, O> windowedOperator = createWindowedOperator(fold);
+    return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts);
+  }
+
+  @Override
+  public <K, V, O, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, O>>>> STREAM foldByKey(FoldFn<V, O> fold, Function.ToKeyValue<T, K, V> convertToKeyVal, Option... opts)
+  {
+    WindowedStream<Tuple<KeyValPair<K, V>>> kvstream = map(convertToKeyVal);
+    KeyedWindowedOperatorImpl<K, V, O, O> keyedWindowedOperator = createKeyedWindowedOperator(fold);
+    return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
+
+  }
+
+  @Override
+  public <O, K, STREAM extends WindowedStream<KeyValPair<K, Iterable<O>>>> STREAM groupByKey(Function.ToKeyValue<T, K, O> convertToKeyVal, Option... opts)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <STREAM extends WindowedStream<Iterable<T>>> STREAM group()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <STREAM extends WindowedStream<T>> STREAM resetTrigger(TriggerOption option)
+  {
+    triggerOption = option;
+    return (STREAM)this;
+  }
+
+  @Override
+  public <STREAM extends WindowedStream<T>> STREAM resetAllowedLateness(Duration allowedLateness)
+  {
+    this.allowedLateness = allowedLateness;
+    return (STREAM)this;
+  }
+
+  @Override
+  protected <O> ApexStream<O> newStream(DagMeta graph, Brick<O> newBrick)
+  {
+    ApexWindowedStreamImpl<O> newstream = new ApexWindowedStreamImpl<>();
+    newstream.graph = graph;
+    newstream.lastBrick = newBrick;
+    newstream.windowOption = this.windowOption;
+    newstream.triggerOption = this.triggerOption;
+    newstream.allowedLateness = this.allowedLateness;
+    return newstream;
+  }
+
+  /**
+   * Create the windowed operator for windowed transformation
+   * @param accumulationFn
+   * @param <IN>
+   * @param <ACCU>
+   * @param <OUT>
+   * @return
+   */
+  private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn)
+  {
+    WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>();
+    //TODO use other default setting in the future
+    windowedOperator.setDataStorage(new InMemoryWindowedStorage<ACCU>());
+    windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<OUT>());
+    windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+    if (windowOption != null) {
+      windowedOperator.setWindowOption(windowOption);
+    }
+    if (triggerOption != null) {
+      windowedOperator.setTriggerOption(triggerOption);
+    }
+    if (allowedLateness != null) {
+      windowedOperator.setAllowedLateness(allowedLateness);
+    }
+    windowedOperator.setAccumulation(accumulationFn);
+    return windowedOperator;
+  }
+
+  private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn)
+  {
+    KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>();
+
+    //TODO use other default setting in the future
+    keyedWindowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<K, ACCU>());
+    keyedWindowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<K, OUT>());
+    keyedWindowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>());
+    if (windowOption != null) {
+      keyedWindowedOperator.setWindowOption(windowOption);
+    }
+    if (triggerOption != null) {
+      keyedWindowedOperator.setTriggerOption(triggerOption);
+    }
+    if (allowedLateness != null) {
+      keyedWindowedOperator.setAllowedLateness(allowedLateness);
+    }
+
+    keyedWindowedOperator.setAccumulation(accumulationFn);
+    return keyedWindowedOperator;
+  }
+
+}