You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/20 00:40:05 UTC

[2/2] incubator-apex-malhar git commit: APEXMALHAR-2006 Simple Stream API

APEXMALHAR-2006 Simple Stream API


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ab76dacd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ab76dacd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ab76dacd

Branch: refs/heads/master
Commit: ab76dacd3c63a9d1b6b335ca19918cffff1eb728
Parents: cf3bb7d
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu May 19 17:22:20 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu May 19 17:22:20 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/lib/algo/UniqueCounter.java |   2 +-
 pom.xml                                         |   3 +-
 stream/pom.xml                                  | 100 ++++
 .../apex/malhar/stream/api/ApexStream.java      | 261 ++++++++++
 .../apex/malhar/stream/api/WindowedStream.java  |  54 ++
 .../malhar/stream/api/function/Function.java    |  80 +++
 .../malhar/stream/api/impl/ApexStreamImpl.java  | 495 +++++++++++++++++++
 .../apex/malhar/stream/api/impl/DagMeta.java    | 200 ++++++++
 .../malhar/stream/api/impl/IDGenerator.java     |  44 ++
 .../malhar/stream/api/impl/StreamFactory.java   |  81 +++
 .../api/operator/AnnonymousClassModifier.java   | 129 +++++
 .../api/operator/ByteArrayClassLoader.java      |  48 ++
 .../stream/api/operator/FunctionOperator.java   | 378 ++++++++++++++
 .../apex/malhar/stream/api/util/KeyedTuple.java |  32 ++
 .../apex/malhar/stream/api/util/TupleUtil.java  |  34 ++
 .../stream/api/impl/ApexStreamImplTest.java     | 122 +++++
 .../stream/sample/ApplicationWithStreamAPI.java |  57 +++
 .../sample/ApplicationWithStreamAPITest.java    |  53 ++
 .../LocalTestWithoutStreamApplication.java      |  85 ++++
 .../apex/malhar/stream/sample/MyStream.java     |  43 ++
 .../apex/malhar/stream/sample/MyStreamTest.java | 140 ++++++
 .../malhar/stream/sample/TupleCollector.java    |  79 +++
 stream/src/test/resources/data/word.txt         |   2 +
 23 files changed, 2520 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
index 1b6d944..013c8bc 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
@@ -83,7 +83,7 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
     @Override
     public Unifier<HashMap<K, Integer>> getUnifier()
     {
-      UnifierHashMapSumKeys unifierHashMapSumKeys =  new UnifierHashMapSumKeys<K, Integer>();
+      UnifierHashMapSumKeys<K, Integer> unifierHashMapSumKeys =  new UnifierHashMapSumKeys<>();
       unifierHashMapSumKeys.setType(Integer.class);
       return unifierHashMapSumKeys;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c0e98a3..b75fe13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -188,10 +188,11 @@
     <profile>
       <id>all-modules</id>
       <modules>
+        <module>kafka</module>
+        <module>stream</module>
         <module>benchmark</module>
         <module>apps</module>
         <module>samples</module>
-        <module>kafka</module>
       </modules>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/pom.xml
----------------------------------------------------------------------
diff --git a/stream/pom.xml b/stream/pom.xml
new file mode 100755
index 0000000..0348236
--- /dev/null
+++ b/stream/pom.xml
@@ -0,0 +1,100 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar</artifactId>
+    <version>3.4.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>malhar-stream</artifactId>
+  <name>Apache Apex Malhar Stream API</name>
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <!-- Publish tests jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-library</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-kafka</artifactId>
+      <version>${project.version}</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>malhar-contrib</artifactId>
+      <version>${project.version}</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-common</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.core.version}</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>cglib</groupId>
+      <artifactId>cglib</artifactId>
+      <version>3.2.1</version>
+    </dependency>
+
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
new file mode 100644
index 0000000..f97e8cc
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * The stream interface to build a DAG
+ * @param <T>
+ */
+public interface ApexStream<T>
+{
+  /**
+   * Simple map transformation<br>
+   * Add an operator to the DAG which convert tuple T to tuple O
+   * @param mapFunction map function
+   * @param <O> Type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction);
+
+  /**
+   * Simple map transformation<br>
+   * Add an operator to the DAG which convert tuple T to tuple O
+   * @param name operator name
+   * @param mapFunction map function
+   * @param <O> Type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mapFunction);
+
+  /**
+   * Flat map transformation
+   * Add an operator to the DAG which convert tuple T to a collection of tuple O
+   * @param flatten flat map
+   * @param <O> Type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten);
+
+  /**
+   * Flat map transformation<br>
+   * Add an operator to the DAG which convert tuple T to a collection of tuple O
+   * @param name operator name
+   * @param flatten
+   * @param <O> Type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten);
+
+  /**
+   * Filter transformation<br>
+   * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
+   * @param filter filter function
+   * @return new stream of same type
+   */
+  <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter);
+
+  /**
+   * Filter transformation<br>
+   * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
+   * @param name operator name
+   * @param filter filter function
+   * @return new stream of same type
+   */
+  <STREAM extends ApexStream<T>> STREAM filter(String name, Function.FilterFunction<T> filter);
+
+  /**
+   * Reduce transformation<br>
+   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+   * @param reduce reduce function
+   * @return new stream of same type
+   */
+  <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce);
+
+  /**
+   * Reduce transformation<br>
+   * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+   * @param name operator name
+   * @param reduce reduce function
+   * @return new stream of same type
+   */
+  <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce);
+
+  /**
+   * Fold transformation<br>
+   * Add an operator to the DAG which merge tuple T to accumulated result tuple O
+   * @param initialValue initial result value
+   * @param fold fold function
+   * @param <O> Result type
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold);
+
+  /**
+   * Fold transformation<br>
+   * Add an operator to the DAG which merge tuple T to accumulated result tuple O
+   * @param name name of the operator
+   * @param initialValue initial result value
+   * @param fold fold function
+   * @param <O> Result type
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold);
+
+  /**
+   * Count of all tuples
+   * @return new stream of Integer
+   */
+  <STREAM extends ApexStream<Integer>> STREAM count();
+
+  /**
+   * Count tuples by the key<br>
+   * If the input is KeyedTuple it will get the key from getKey method from the tuple<br>
+   * If not, use the tuple itself as a key
+   * @return new stream of Map
+   */
+  <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey();
+
+  /**
+   *
+   * Count tuples by the indexed key
+   * @param key the index of the field in the tuple that are used as key
+   * @return new stream of Map
+   */
+  <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key);
+
+  /**
+   * Extend the dag by adding one operator<br>
+   * @param op Operator added to the stream
+   * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
+   * @param outputPort OutputPort of the operator will be connected to next operator
+   * @param <O> type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort,  Operator.OutputPort<O> outputPort);
+
+  /**
+   * Extend the dag by adding one {@see Operator}
+   * @param opName Operator name
+   * @param op Operator added to the stream
+   * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
+   * @param outputPort OutputPort of the operator will be connected to next operator
+   * @param <O> type of the output
+   * @return new stream of type O
+   */
+  <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort,  Operator.OutputPort<O> outputPort);
+
+  /**
+   * Union multiple stream into one
+   * @param others  other streams
+   * @return new stream of same type
+   */
+  <STREAM extends ApexStream<T>> STREAM union(ApexStream<T>... others);
+
+  /**
+   * Add a stdout console output operator
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM print();
+
+  /**
+   * Add a stderr console output operator
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM printErr();
+
+  /**
+   * Set the attribute value<br>
+   * If it is {@link DAGContext DAG attribute}, it will be applied to the whole DAG <br>
+   * If it is {@link OperatorContext Operator attribute}, it will be applied to last connected operator<br>
+   * If it is {@link PortContext InputPort attribute}, it will be applied to the input port of the last connected stream<br>
+   * If it is {@link PortContext OutputPort attribute}, it will be applied to the output port of the last connected stream<br>
+   * If it is both {@link PortContext InputPort&OutputPort attribute}, it will be applied to last connected stream
+   * @param attribute {@see Attribute}
+   * @param value value of the attribute
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM with(Attribute attribute, Object value);
+
+  /**
+   * Set attributes at the DAG level
+   * @param attribute {@see Attribute}
+   * @param value value of the attribute
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM setGlobalAttribute(Attribute attribute, Object value);
+
+  /**
+   * Set the locality
+   * @param locality {@see DAG.Locality}
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM with(DAG.Locality locality);
+
+  /**
+   * Set the property value of the last connected operator
+   * @param propName property name
+   * @param value value of the property
+   * @return stream itself
+   */
+  <STREAM extends ApexStream<T>> STREAM with(String propName, Object value);
+
+
+  /**
+   * Create dag from stream
+   * @return {@see DAG}
+   */
+  DAG createDag();
+
+  /**
+   * Populate existing dag
+   * @param dag {@see DAG}
+   */
+  void populateDag(DAG dag);
+
+
+  /**
+   * Run the stream application in local mode
+   * In Async mode, the method would return immediately and the dag would run for "duration" milliseconds
+   * In Sync mode, the method would block "duration" milliseconds to run the dag
+   * If duration is negative the dag would run forever until being killed
+   * @param async true if run in Async mode
+   *              false if run in sync mode
+   */
+  void runEmbedded(boolean async, long duration, Callable<Boolean> exitCondition);
+
+
+  /**
+   * Submit the application to cluster
+   */
+  void run();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
new file mode 100644
index 0000000..a3bc846
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+/**
+ * <p>
+ * A stream with windowed transformation
+ * </p>
+ * <p>
+ * <B>Transformation types:</B>
+ * <ul>
+ * <li>Combine</li>
+ * <li>Group</li>
+ * <li>Keyed Combine</li>
+ * <li>Keyed Group</li>
+ * <li>Join</li>
+ * <li>CoGroup</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <B>Features supported with windowed transformation </B>
+ * <ul>
+ * <li>Watermark - Ingestion time watermark / logical tuple watermark</li>
+ * <li>Early Triggers - How frequent to emit real-time partial result</li>
+ * <li>Late Triggers - When to emit updated result with tuple comes after watermark</li>
+ * <li>Customizable Trigger Behaviour - What to do when fires a trigger</li>
+ * <li>Spool window state -  In-Memory window state can be spooled to disk if it is full</li>
+ * <li>3 different accumulation models: ignore, accumulation, accumulation + delta</li>
+ * <li>Window support: Non-Mergeable window(fix window, sliding window), Mergeable window(session window) base on 3 different tuple time</li>
+ * <li>Different tuple time support: event time, system time, ingestion time</li>
+ * </ul>
+ * </p>
+ *
+ * @param <T> Output tuple type
+ */
+public interface WindowedStream<T> extends ApexStream<T>
+{
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
new file mode 100644
index 0000000..1167744
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.function;
+
+/**
+ * The top level function interface
+ */
+public interface Function
+{
+  /**
+   * If the {@link Function} implements this interface.
+   * The state of the function will be checkpointed
+   */
+  public static interface Stateful
+  {
+
+  }
+
+  /**
+   * An interface defines a one input one output transformation
+   * @param <I>
+   * @param <O>
+   */
+  public static interface MapFunction<I, O> extends Function
+  {
+    O f(I input);
+  }
+
+  /**
+   * An interface defines a reduce transformation
+   * @param <T>
+   */
+  public static interface ReduceFunction<T> extends Function
+  {
+    T reduce(T t1, T t2);
+  }
+
+  /**
+   * An interface that defines a fold transformation
+   * @param <I>
+   * @param <O>
+   */
+  public static interface FoldFunction<I, O> extends Function
+  {
+    O fold(I input, O output);
+  }
+
+  /**
+   * An interface that defines flatmap transforation
+   * @param <I>
+   * @param <O>
+   */
+  public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>>
+  {
+  }
+
+  /**
+   * An interface that defines filter transformation
+   * @param <T>
+   */
+  public static interface FilterFunction<T> extends MapFunction<T, Boolean>
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
new file mode 100644
index 0000000..0e47727
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Default stream implementation for ApexStream interface.
+ * It creates the dag(execution plan) from stream api
+ */
+public class ApexStreamImpl<T> implements ApexStream<T>
+{
+
+  private static Set<Attribute<?>> OPERATOR_ATTRIBUTES;
+
+  private static Set<Attribute<?>> DAG_ATTRIBUTES;
+
+  private static Set<Attribute<?>> INPUT_ATTRIBUTES;
+
+  private static Set<Attribute<?>> OUTPUT_ATTRIBUTES;
+
+  static {
+
+    OPERATOR_ATTRIBUTES = new HashSet<>();
+    DAG_ATTRIBUTES = new HashSet<>();
+    INPUT_ATTRIBUTES = new HashSet<>();
+    OUTPUT_ATTRIBUTES = new HashSet<>();
+
+    try {
+      for (Field field : Context.OperatorContext.class.getDeclaredFields()) {
+        if (field.getType() == Attribute.class) {
+          OPERATOR_ATTRIBUTES.add((Attribute)field.get(Context.OperatorContext.class));
+        }
+      }
+
+      for (Field field : Context.DAGContext.class.getDeclaredFields()) {
+        if (field.getType() == Attribute.class) {
+          DAG_ATTRIBUTES.add((Attribute)field.get(Context.DAGContext.class));
+        }
+      }
+    } catch (IllegalAccessException e) {
+      //Ignore here
+    }
+
+    INPUT_ATTRIBUTES.add(Context.PortContext.PARTITION_PARALLEL);
+    INPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
+    INPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
+    INPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
+
+
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.QUEUE_CAPACITY);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.BUFFER_MEMORY_MB);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.SPIN_MILLIS);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.UNIFIER_SINGLE_FINAL);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.IS_OUTPUT_UNIFIED);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
+    OUTPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
+
+  }
+
+  /**
+   * The extension point of the stream
+   *
+   * @param <T>
+   */
+  public static class Brick<T>
+  {
+
+    private Operator.OutputPort<T> lastOutput;
+
+    private DagMeta.NodeMeta nodeMeta;
+
+    private Pair<Operator.OutputPort, Operator.InputPort> lastStream;
+
+    public Operator.OutputPort<T> getLastOutput()
+    {
+      return lastOutput;
+    }
+
+    public void setLastOutput(Operator.OutputPort<T> lastOutput)
+    {
+      this.lastOutput = lastOutput;
+    }
+
+    public void setLastStream(Pair<Operator.OutputPort, Operator.InputPort> lastStream)
+    {
+      this.lastStream = lastStream;
+    }
+
+    public Pair<Operator.OutputPort, Operator.InputPort> getLastStream()
+    {
+      return lastStream;
+    }
+  }
+
+  /**
+   * Graph behind the stream
+   */
+  private DagMeta graph;
+
+  private ApexStream<T> delegator;
+
+  /**
+   * Right now the stream only support single extend point
+   * You can have multiple downstream operators connect to this single extend point though
+   */
+  private Brick<T> lastBrick;
+
+  public Brick<T> getLastBrick()
+  {
+    return lastBrick;
+  }
+
+  public void setLastBrick(Brick<T> lastBrick)
+  {
+    this.lastBrick = lastBrick;
+  }
+
+  public ApexStreamImpl()
+  {
+    graph = new DagMeta();
+  }
+
+  public ApexStreamImpl(ApexStream<T> apexStream)
+  {
+    this.delegator = apexStream;
+    if (delegator != null && delegator instanceof ApexStreamImpl) {
+      graph = ((ApexStreamImpl)delegator).graph;
+      lastBrick = ((ApexStreamImpl<T>)delegator).lastBrick;
+    }
+  }
+
+  public ApexStreamImpl(DagMeta graph)
+  {
+    this(graph, null);
+  }
+
+  public ApexStreamImpl(DagMeta graph, Brick<T> lastBrick)
+  {
+    this.graph = graph;
+    this.lastBrick = lastBrick;
+  }
+
+  @Override
+  public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf)
+  {
+    return map(mf.toString(), mf);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mf)
+  {
+    FunctionOperator.MapFunctionOperator<T, O> opt = new FunctionOperator.MapFunctionOperator<>(mf);
+    return (STREAM)addOperator(name, opt, opt.input, opt.output);
+  }
+
+  @Override
+  public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten)
+  {
+    return flatMap(flatten.toString(), flatten);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten)
+  {
+    FunctionOperator.FlatMapFunctionOperator<T, O> opt = new FunctionOperator.FlatMapFunctionOperator<>(flatten);
+    return (STREAM)addOperator(name, opt, opt.input, opt.output);
+  }
+
+  @Override
+  public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter)
+  {
+    return filter(filter.toString(), filter);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <STREAM extends ApexStream<T>> STREAM filter(String name, final Function.FilterFunction<T> filter)
+  {
+    FunctionOperator.FilterFunctionOperator<T> filterFunctionOperator = new FunctionOperator.FilterFunctionOperator<>(filter);
+    return (STREAM)addOperator(name, filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output);
+  }
+
+  @Override
+  public <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce)
+  {
+    return reduce(reduce.toString(), reduce);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce)
+  {
+    FunctionOperator.ReduceFunctionOperator<T> opt = new FunctionOperator.ReduceFunctionOperator<>(reduce);
+    return (STREAM)addOperator(name, opt, opt.input, opt.output);
+  }
+
+  @Override
+  public <O, STREAM extends ApexStream<O>> STREAM fold(final O initialValue, Function.FoldFunction<T, O> fold)
+  {
+    return fold(fold.toString(), initialValue, fold);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold)
+  {
+    FunctionOperator.FoldFunctionOperator<T, O> opt = new FunctionOperator.FoldFunctionOperator<>(fold, initialValue);
+    return (STREAM)addOperator(name, opt, opt.input, opt.output);
+  }
+
+  @Override
+  public <STREAM extends ApexStream<Integer>> STREAM count()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey()
+  {
+    // Needs to change the unique counter to support keys
+    UniqueCounter<Object> uniqueCounter = new UniqueCounter<>();
+    uniqueCounter.setCumulative(true);
+    Operator.OutputPort<? extends Map<Object, Integer>> resultPort = uniqueCounter.count;
+    return (STREAM)addOperator("CounterByKey", uniqueCounter, (Operator.InputPort<T>)uniqueCounter.data, resultPort);
+  }
+
+  @Override
+  public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+  {
+    return addOperator(op.toString(), op, inputPort, outputPort);
+  }
+
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+  {
+
+    if (delegator != null) {
+      ApexStreamImpl<O> apexStream = delegator.addOperator(opName, op, inputPort, outputPort);
+      try {
+        return (STREAM)this.getClass().getConstructor(ApexStream.class).newInstance(apexStream);
+      } catch (Exception e) {
+        throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as delegator");
+      }
+    }
+
+    checkArguments(op, inputPort, outputPort);
+
+    DagMeta.NodeMeta nm = null;
+
+    if (lastBrick == null) {
+      nm = graph.addNode(opName, op, null, null, inputPort);
+    } else {
+
+      nm = graph.addNode(opName, op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort);
+    }
+
+    Brick<O> newBrick = new Brick<>();
+    newBrick.nodeMeta = nm;
+    newBrick.setLastOutput(outputPort);
+    if (lastBrick != null) {
+      newBrick.lastStream = Pair.<Operator.OutputPort, Operator.InputPort>of(lastBrick.lastOutput, inputPort);
+    }
+
+    return (STREAM)new ApexStreamImpl<>(this.graph, newBrick);
+  }
+
+  /* Check to see if inputPort and outputPort belongs to the operator */
+  private void checkArguments(Operator op, Operator.InputPort inputPort, Operator.OutputPort outputPort)
+  {
+    if (op == null) {
+      throw new IllegalArgumentException("Operator can not be null");
+    }
+
+    boolean foundInput = inputPort == null;
+    boolean foundOutput = outputPort == null;
+    for (Field f : op.getClass().getFields()) {
+      int modifiers = f.getModifiers();
+      if (!Modifier.isPublic(modifiers) || !Modifier.isTransient(modifiers)) {
+        continue;
+      }
+      Object obj = null;
+      try {
+        obj = f.get(op);
+      } catch (IllegalAccessException e) {
+        // NonAccessible field is not a valid port object
+      }
+      if (obj == outputPort) {
+        foundOutput = true;
+      }
+      if (obj == inputPort) {
+        foundInput = true;
+      }
+    }
+    if (!foundInput || !foundOutput) {
+      throw new IllegalArgumentException("Input port " + inputPort + " and/or Output port " + outputPort + " is/are not owned by Operator " + op);
+    }
+
+  }
+
+  @Override
+  public <STREAM extends ApexStream<T>> STREAM union(ApexStream<T>... others)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStreamImpl<T> print()
+  {
+    ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
+    addOperator(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()), consoleOutputOperator,
+        (Operator.InputPort<T>)consoleOutputOperator.input, null);
+    return this;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStream<T> printErr()
+  {
+    //TODO need to make ConsoleOutputOperator support stderr
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStream<T> with(Attribute attribute, Object value)
+  {
+    if (OPERATOR_ATTRIBUTES.contains(attribute)) {
+      lastBrick.nodeMeta.operatorAttributes.add(Pair.of(attribute, value));
+    }
+
+    if (INPUT_ATTRIBUTES.contains(attribute)) {
+      if (lastBrick.lastStream != null) {
+        List<Pair<Attribute, Object>> attrs = lastBrick.nodeMeta.inputPortAttributes.get(lastBrick.lastStream.getRight());
+        if (attrs == null) {
+          attrs = new LinkedList<>();
+        }
+        attrs.add(Pair.of(attribute, value));
+        lastBrick.nodeMeta.inputPortAttributes.put(lastBrick.lastStream.getRight(), attrs);
+      }
+    }
+
+    if (OUTPUT_ATTRIBUTES.contains(attribute)) {
+      if (lastBrick.lastStream != null) {
+
+        for (DagMeta.NodeMeta parent : lastBrick.nodeMeta.getParent()) {
+          parent.getNodeStreams().containsKey(lastBrick.lastStream.getLeft());
+          List<Pair<Attribute, Object>> attrs = parent.outputPortAttributes.get(lastBrick.lastStream.getLeft());
+          if (attrs == null) {
+            attrs = new LinkedList<>();
+          }
+          attrs.add(Pair.of(attribute, value));
+          lastBrick.nodeMeta.outputPortAttributes.put(lastBrick.lastStream.getLeft(), attrs);
+        }
+      }
+    }
+
+    setGlobalAttribute(attribute, value);
+
+    return this;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStream<T> setGlobalAttribute(Attribute attribute, Object value)
+  {
+    graph.dagAttributes.add(Pair.of(attribute, value));
+    return this;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStream<T> with(DAG.Locality locality)
+  {
+    if (lastBrick.lastStream != null) {
+      for (DagMeta.NodeMeta parent : lastBrick.nodeMeta.getParent()) {
+        Pair<List<Operator.InputPort>, DAG.Locality> p = parent.getNodeStreams().get(lastBrick.lastStream.getLeft());
+        if (p != null) {
+          p.setValue(locality);
+        }
+      }
+    }
+    return this;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ApexStream<T> with(String propName, Object value)
+  {
+    try {
+      BeanUtils.setProperty(lastBrick.nodeMeta.getOperator(), propName, value);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return this;
+  }
+
+
+  @Override
+  public DAG createDag()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    populateDag(dag);
+    return dag;
+  }
+
+  @Override
+  public void populateDag(DAG dag)
+  {
+    graph.buildDAG(dag);
+  }
+
+  @Override
+  public void runEmbedded(boolean async, long duration, Callable<Boolean> exitCondition)
+  {
+    LocalMode lma = LocalMode.newInstance();
+    populateDag(lma.getDAG());
+    LocalMode.Controller lc = lma.getController();
+    if (lc instanceof StramLocalCluster) {
+      ((StramLocalCluster)lc).setExitCondition(exitCondition);
+    }
+    if (async) {
+      lc.runAsync();
+    } else {
+      if (duration >= 0) {
+        lc.run(duration);
+      } else {
+        lc.run();
+      }
+    }
+
+  }
+
+
+  @Override
+  public void run()
+  {
+    throw new UnsupportedOperationException();
+    //TODO need an api to submit the StreamingApplication to cluster
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
new file mode 100644
index 0000000..15417b5
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Graph data structure for DAG
+ * With this data structure, the framework can do lazy load and optimization
+ */
+public class DagMeta
+{
+
+  private List<NodeMeta> heads = new LinkedList<>();
+
+  List<Pair<Attribute, Object>> dagAttributes = new LinkedList<>();
+
+  public static class NodeMeta
+  {
+
+    private String nodeName;
+
+    private Operator operator;
+
+    List<Pair<Attribute, Object>> operatorAttributes = new LinkedList<>();
+
+    private Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> nodeStreams = new HashMap<>();
+
+    Map<Operator.OutputPort, List<Pair<Attribute, Object>>> outputPortAttributes = new HashMap<>();
+
+    Map<Operator.InputPort, List<Pair<Attribute, Object>>> inputPortAttributes = new HashMap<>();
+
+    private Set<Operator.InputPort> operatorInputs = new HashSet<>();
+
+    private List<NodeMeta> children = new LinkedList<>();
+
+    private List<NodeMeta> parent = new LinkedList<>();
+
+    public List<NodeMeta> getParent()
+    {
+      return parent;
+    }
+
+    public List<NodeMeta> getChildren()
+    {
+      return children;
+    }
+
+    public String getNodeName()
+    {
+      return nodeName;
+    }
+
+    public Operator getOperator()
+    {
+      return operator;
+    }
+
+    public Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> getNodeStreams()
+    {
+      return nodeStreams;
+    }
+
+    public NodeMeta(Operator operator, String nodeName)
+    {
+
+      this.nodeName = nodeName;
+
+      this.operator = operator;
+
+      for (Field field : this.operator.getClass().getFields()) {
+        int modifier = field.getModifiers();
+        if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
+            Operator.OutputPort.class.isAssignableFrom(field.getType())) {
+          try {
+            nodeStreams.put((Operator.OutputPort)field.get(operator), MutablePair.<List<Operator.InputPort>, DAG.Locality>of(new LinkedList<Operator.InputPort>(), null));
+          } catch (IllegalAccessException e) {
+            //Do nothing because it's been checked in if condition
+          }
+        }
+        if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
+            Operator.InputPort.class.isAssignableFrom(field.getType())) {
+          try {
+            operatorInputs.add((Operator.InputPort)field.get(operator));
+          } catch (IllegalAccessException e) {
+            //Do nothing because it's been checked in if condition
+          }
+        }
+      }
+    }
+
+  }
+
+  public DagMeta()
+  {
+
+  }
+
+  public DAG buildDAG()
+  {
+    DAG dag = new LogicalPlan();
+    buildDAG(dag);
+    return dag;
+  }
+
+  public void buildDAG(DAG dag)
+  {
+    for (NodeMeta nm : heads) {
+      visitNode(nm, dag);
+    }
+  }
+
+  private void visitNode(NodeMeta nm, DAG dag)
+  {
+    dag.addOperator(nm.nodeName, nm.operator);
+    for (NodeMeta child : nm.children) {
+      visitNode(child, dag);
+    }
+
+    for (Map.Entry<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> entry : nm.nodeStreams.entrySet()) {
+      if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) {
+        continue;
+      }
+      DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(),
+          entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
+      // set locality
+      if (entry.getValue().getRight() != null) {
+        streamMeta.setLocality(entry.getValue().getRight());
+      }
+      //set attributes for output port
+      if (nm.outputPortAttributes.containsKey(entry.getKey())) {
+        for (Pair<Attribute, Object> attr : nm.outputPortAttributes.get(entry.getKey())) {
+          dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue());
+        }
+      }
+    }
+
+
+    for (Operator.InputPort input : nm.operatorInputs) {
+      //set input port attributes
+      if (nm.inputPortAttributes.containsKey(input)) {
+        for (Pair<Attribute, Object> attr : nm.inputPortAttributes.get(input)) {
+          dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue());
+        }
+      }
+    }
+
+    // set operator attributes
+    for (Pair<Attribute, Object> attr : nm.operatorAttributes) {
+      dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue());
+    }
+
+  }
+
+  public NodeMeta addNode(String nodeName, Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort)
+  {
+
+    NodeMeta newNode = new NodeMeta(operator, nodeName);
+    if (parent == null) {
+      heads.add(newNode);
+    } else {
+      parent.nodeStreams.get(parentOutput).getLeft().add(inputPort);
+      parent.children.add(newNode);
+      newNode.parent.add(parent);
+    }
+    return newNode;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
new file mode 100644
index 0000000..36200f2
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.UUID;
+
+import com.datatorrent.api.Operator;
+
+import static java.lang.System.currentTimeMillis;
+
+/**
+ * A util class to generate unique ids in dag
+ */
+public class IDGenerator
+{
+
+  public static String generateOperatorIDWithClock(Class<? extends Operator> operatorClazz)
+  {
+    return operatorClazz.getName() + currentTimeMillis();
+  }
+
+  public static String generateOperatorIDWithUUID(Class<? extends Operator> operatorClazz)
+  {
+    return operatorClazz.getName() + UUID.randomUUID();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
new file mode 100644
index 0000000..70a11b5
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+import org.apache.apex.malhar.stream.api.ApexStream;
+
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+
+/**
+ * A Factory class to build from different kind of input source
+ */
+public class StreamFactory
+{
+  public static ApexStream<String> fromFolder(String inputOperatorName, String folderName)
+  {
+    LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator();
+    fileLineInputOperator.setDirectory(folderName);
+    ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+    return newStream.addOperator(inputOperatorName, fileLineInputOperator, null, fileLineInputOperator.output);
+  }
+
+  public static ApexStream<String> fromFolder(String folderName)
+  {
+    return fromFolder("FolderScanner", folderName);
+  }
+
+  public static ApexStream<String> fromKafka08(String zookeepers, String topic)
+  {
+    return fromKafka08("Kafka08Input", zookeepers, topic);
+  }
+
+  public static ApexStream<String> fromKafka08(String inputName, String zookeepers, String topic)
+  {
+    KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
+    kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic);
+    kafkaSinglePortStringInputOperator.getConsumer().setZookeeper(zookeepers);
+    ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+    return newStream.addOperator(kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort);
+  }
+
+  public static <T> ApexStream<T> fromInput(String inputOperatorName, InputOperator operator, Operator.OutputPort<T> outputPort)
+  {
+    ApexStreamImpl<T> newStream = new ApexStreamImpl<>();
+    return newStream.addOperator(inputOperatorName, operator, null, outputPort);
+  }
+
+  public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort)
+  {
+    return fromInput(operator.toString(), operator, outputPort);
+  }
+
+  public static ApexStream<String> fromKafka09(String name, String brokers, String topic)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  public static ApexStream<String> fromKafka09(String brokers, String topic)
+  {
+    return fromKafka09("KafkaInput", brokers, topic);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
new file mode 100644
index 0000000..9628354
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+/**
+ * Because annonymous class serialization is not supported by default in most serialization library
+ * This class is used to modify the bytecode of annonymous at runtime.
+ * The limit for this is the annonymous class that is being modified must by stateless
+ *
+ */
+public class AnnonymousClassModifier extends ClassVisitor
+{
+  private String className;
+
+  private boolean hasDefaultConstructor = false;
+
+  public AnnonymousClassModifier(int i)
+  {
+    super(i);
+  }
+
+  public AnnonymousClassModifier(int i, ClassVisitor classVisitor)
+  {
+    super(i, classVisitor);
+  }
+
+  @Override
+  public void visit(int i, int i1, String s, String s1, String s2, String[] strings)
+  {
+    className = s;
+    super.visit(i, 33, s, s1, s2, strings);
+  }
+
+  @Override
+  public void visitSource(String s, String s1)
+  {
+    super.visitSource(s, s1);
+  }
+
+  @Override
+  public void visitOuterClass(String s, String s1, String s2)
+  {
+    // skip outer class, make it top level. For now only one level annonymous class
+    return;
+  }
+
+  @Override
+  public AnnotationVisitor visitAnnotation(String s, boolean b)
+  {
+    return super.visitAnnotation(s, b);
+  }
+
+
+  @Override
+  public void visitAttribute(Attribute attribute)
+  {
+    super.visitAttribute(attribute);
+  }
+
+  @Override
+  public void visitInnerClass(String s, String s1, String s2, int i)
+  {
+    if (s.equals(className)) {
+      return;
+    }
+    super.visitInnerClass(s, s1, s2, i);
+  }
+
+  @Override
+  public FieldVisitor visitField(int i, String s, String s1, String s2, Object o)
+  {
+    return super.visitField(i, s, s1, s2, o);
+  }
+
+  @Override
+  public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings)
+  {
+    //make the constructor public
+    int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i;
+    if (s1.contains("()V")) {
+      hasDefaultConstructor = true;
+    }
+
+    return super.visitMethod(i, s, s1, s2, strings);
+  }
+
+  @Override
+  public void visitEnd()
+  {
+
+    // If there is no default constructor, create one
+    if (!hasDefaultConstructor) {
+      MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
+      mv.visitVarInsn(Opcodes.ALOAD, 0);
+      mv.visitMethodInsn(Opcodes.INVOKESPECIAL,
+          "java/lang/Object",
+          "<init>",
+          "()V");
+      mv.visitInsn(Opcodes.RETURN);
+      mv.visitMaxs(5, 1);
+      mv.visitEnd();
+    }
+
+    super.visitEnd();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
new file mode 100644
index 0000000..61ec10b
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+
+import java.util.Map;
+
+public class ByteArrayClassLoader extends ClassLoader
+{
+  private final Map<String, byte[]> classes;
+
+  public ByteArrayClassLoader(Map<String, byte[]> classes)
+  {
+    this.classes = classes;
+  }
+
+  public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent)
+  {
+    super(parent);
+    this.classes = classes;
+  }
+
+  protected Class findClass(String name) throws ClassNotFoundException
+  {
+    byte[] data = (byte[])((byte[])this.classes.get(name));
+    if (data == null) {
+      throw new ClassNotFoundException(name);
+    } else {
+      return super.defineClass(name, data, 0, data.length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
new file mode 100644
index 0000000..5f80062
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.commons.io.IOUtils;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * Operators that wrap the functions
+ */
+public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
+{
+  private byte[] annonymousFunctionClass;
+
+  protected transient FUNCTION statelessF;
+
+  protected FUNCTION statefulF;
+
+  protected boolean stateful = false;
+
+  protected boolean isAnnonymous = false;
+
+  public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
+
+  public FunctionOperator(FUNCTION f)
+  {
+    isAnnonymous = f.getClass().isAnonymousClass();
+    if (isAnnonymous) {
+      annonymousFunctionClass = functionClassData(f);
+    } else if (f instanceof Function.Stateful) {
+      statelessF = f;
+    } else {
+      statefulF = f;
+      stateful = true;
+    }
+  }
+
+  private byte[] functionClassData(Function f)
+  {
+    Class<? extends Function> classT = f.getClass();
+
+    byte[] classBytes = null;
+    byte[] classNameBytes = null;
+    String className = classT.getName();
+    try {
+      classNameBytes = className.replace('.', '/').getBytes();
+      classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class"));
+      int cursor = 0;
+      for (int j = 0; j < classBytes.length; j++) {
+        if (classBytes[j] != classNameBytes[cursor]) {
+          cursor = 0;
+        } else {
+          cursor++;
+        }
+
+        if (cursor == classNameBytes.length) {
+          for (int p = 0; p < classNameBytes.length; p++) {
+            if (classBytes[j - p] == '$') {
+              classBytes[j - p] = '_';
+            }
+          }
+          cursor = 0;
+        }
+      }
+      ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes));
+      ClassWriter cw = new ClassWriter(0);
+      AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw);
+      cr.accept(annonymousClassModifier, 0);
+      classBytes = cw.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    int dataLength = classNameBytes.length + 4 + 4;
+
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength);
+    DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
+
+    try {
+      output.writeInt(classNameBytes.length);
+      output.write(className.replace('$', '_').getBytes());
+      output.writeInt(classBytes.length);
+      output.write(classBytes);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        output.flush();
+        output.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return byteArrayOutputStream.toByteArray();
+
+  }
+
+  /**
+   * Default constructor to make kryo happy
+   */
+  public FunctionOperator()
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    readFunction();
+  }
+
+
+  @SuppressWarnings("unchecked")
+  private void readFunction()
+  {
+    try {
+      if (statelessF != null || statefulF != null) {
+        return;
+      }
+      DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
+      byte[] classNameBytes = new byte[input.readInt()];
+      input.read(classNameBytes);
+      String className = new String(classNameBytes);
+      byte[] classData = new byte[input.readInt()];
+      input.read(classData);
+      Map<String, byte[]> classBin = new HashMap<>();
+      classBin.put(className, classData);
+      ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader());
+      statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public FUNCTION getFunction()
+  {
+    readFunction();
+    if (stateful) {
+      return statefulF;
+    } else {
+      return statelessF;
+    }
+  }
+
+  public FUNCTION getStatelessF()
+  {
+    return statelessF;
+  }
+
+  public void setStatelessF(FUNCTION statelessF)
+  {
+    this.statelessF = statelessF;
+  }
+
+  public FUNCTION getStatefulF()
+  {
+    return statefulF;
+  }
+
+  public void setStatefulF(FUNCTION statefulF)
+  {
+    this.statefulF = statefulF;
+  }
+
+  public boolean isStateful()
+  {
+    return stateful;
+  }
+
+  public void setStateful(boolean stateful)
+  {
+    this.stateful = stateful;
+  }
+
+  public boolean isAnnonymous()
+  {
+    return isAnnonymous;
+  }
+
+  public void setIsAnnonymous(boolean isAnnonymous)
+  {
+    this.isAnnonymous = isAnnonymous;
+  }
+
+  public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>>
+  {
+
+    public MapFunctionOperator()
+    {
+
+    }
+
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.MapFunction<IN, OUT> f = getFunction();
+        output.emit(f.f(t));
+      }
+    };
+
+    public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
+    {
+      super(f);
+    }
+  }
+
+  public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>>
+  {
+
+    public FlatMapFunctionOperator()
+    {
+
+    }
+
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.FlatMapFunction<IN, OUT> f = getFunction();
+        for (OUT out : f.f(t)) {
+          output.emit(out);
+        }
+      }
+    };
+
+    public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
+    {
+      super(f);
+    }
+  }
+
+  public static class FoldFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FoldFunction<IN, OUT>>
+  {
+
+    public FoldFunctionOperator()
+    {
+
+    }
+
+    @NotNull
+    private OUT foldVal;
+
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.FoldFunction<IN, OUT> f = getFunction();
+        // fold the value
+        foldVal = f.fold(t, foldVal);
+        output.emit(foldVal);
+      }
+    };
+
+    public FoldFunctionOperator(Function.FoldFunction<IN, OUT> f, OUT initialVal)
+    {
+      super(f);
+      this.foldVal = initialVal;
+    }
+  }
+
+  public static class ReduceFunctionOperator<IN> extends FunctionOperator<IN, Function.ReduceFunction<IN>>
+  {
+
+    public ReduceFunctionOperator()
+    {
+
+    }
+
+    @NotNull
+    private IN reducedVal;
+
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.ReduceFunction<IN> f = getFunction();
+        // fold the value
+        if (reducedVal == null) {
+          reducedVal = t;
+          return;
+        }
+        reducedVal = f.reduce(t, reducedVal);
+        output.emit(reducedVal);
+      }
+    };
+
+    public ReduceFunctionOperator(Function.ReduceFunction<IN> f)
+    {
+      super(f);
+    }
+  }
+
+  public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
+  {
+
+    public FilterFunctionOperator()
+    {
+
+    }
+
+    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    {
+      @Override
+      public void process(IN t)
+      {
+        Function.FilterFunction<IN> f = getFunction();
+        // fold the value
+        if (f.f(t)) {
+          output.emit(t);
+        }
+      }
+    };
+
+    public FilterFunctionOperator(Function.FilterFunction<IN> f)
+    {
+      super(f);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
new file mode 100644
index 0000000..755521f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.util;
+
+/**
+ * An interface indicate a tuple with a specific key
+ * It is used internally to identify the key from the tuple
+ */
+public interface KeyedTuple<K>
+{
+  /**
+   * Return the key of the tuple
+   * @return
+   */
+  K getKey();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
new file mode 100644
index 0000000..f6c2552
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.util;
+
+/**
+ * The tuple util will be used to extract fields that are used as key or value<br>
+ * Or converting from data tuples to display tuples <br>
+ * Or generating watermark tuples <br>
+ *
+ */
+public class TupleUtil
+{
+
+  public static interface NONE
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
new file mode 100644
index 0000000..71b9a82
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Unit test to default implementation of ApexStream interface
+ */
+public class ApexStreamImplTest
+{
+
+  @Test
+  public void testAddOperator()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    TestOperator<String, Integer> firstOperator = new TestOperator<>();
+    TestOperator<Integer, Date> secondOperator = new TestOperator<>();
+    new ApexStreamImpl<String>().addOperator("first", firstOperator, null, firstOperator.output)
+        .addOperator("second", secondOperator, secondOperator.input, null)
+        .with(DAG.Locality.THREAD_LOCAL)
+        .with(Context.OperatorContext.AUTO_RECORD, true)
+        .with("prop", "TestProp").populateDag(dag);
+    Assert.assertTrue(dag.getAllOperators().size() == 2);
+    Set<String> opNames = new HashSet<>();
+    opNames.add("first");
+    opNames.add("second");
+    for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
+      Assert.assertTrue(operatorMeta.getOperator() instanceof TestOperator);
+      Assert.assertTrue(opNames.contains(operatorMeta.getName()));
+      if (operatorMeta.getName().equals("second")) {
+        // Assert the Context.OperatorContext.AUTO_RECORD attribute has been set to true for second operator
+        Assert.assertTrue(operatorMeta.getAttributes().get(Context.OperatorContext.AUTO_RECORD));
+        // Assert the prop has been set to TestProp for second operator
+        Assert.assertEquals("TestProp", ((TestOperator)operatorMeta.getOperator()).prop);
+      } else {
+        // Assert the Context.OperatorContext.AUTO_RECORD attribute keeps as default false for first operator
+        Assert.assertNull(operatorMeta.getAttributes().get(Context.OperatorContext.AUTO_RECORD));
+        // Assert the prop has not been set for first operator
+        Assert.assertNull(((TestOperator)operatorMeta.getOperator()).prop);
+      }
+    }
+
+    Collection<LogicalPlan.StreamMeta> streams = dag.getAllStreams();
+    // Assert there is only one stream
+    Assert.assertTrue(streams.size() == 1);
+    for (LogicalPlan.StreamMeta stream : streams) {
+
+      // Assert the stream is from first operator to second operator
+      Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName());
+      Assert.assertTrue(1 == stream.getSinks().size());
+      Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName());
+
+      // Assert the stream is thread local
+      Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL);
+    }
+
+  }
+
+  /**
+   * A mock operator for test
+   * @param <T>
+   * @param <O>
+   */
+  public static class TestOperator<T, O> extends BaseOperator
+  {
+
+    private String prop = null;
+
+    public void setProp(String prop)
+    {
+      this.prop = prop;
+    }
+
+    public String getProp()
+    {
+      return prop;
+    }
+
+    public final transient InputPort<T> input = new DefaultInputPort<T>()
+    {
+      @Override
+      public void process(T o)
+      {
+
+      }
+    };
+
+    public final transient OutputPort<O> output = new DefaultOutputPort<>();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
new file mode 100644
index 0000000..44f76b1
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * An application example with stream api
+ */
+@ApplicationAnnotation(name = "WordCountStreamingApiDemo")
+public class ApplicationWithStreamAPI implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    String localFolder = "./src/test/resources/data";
+    ApexStream stream = StreamFactory
+        .fromFolder(localFolder)
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split(" "));
+          }
+        });
+    stream.print();
+    stream.countByKey().print();
+    stream.populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
new file mode 100644
index 0000000..70f26f2
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * A application test using stream api
+ */
+public class ApplicationWithStreamAPITest
+{
+  private static final Logger logger = LoggerFactory.getLogger(ApplicationWithStreamAPITest.class);
+
+  public ApplicationWithStreamAPITest()
+  {
+  }
+
+  @Test
+  public void testWordcountApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(new ApplicationWithStreamAPI(), conf);
+    LocalMode.Controller lc = lma.getController();
+    long start = System.currentTimeMillis();
+    lc.run(5000);
+    long end = System.currentTimeMillis();
+    long time = end - start;
+    logger.info("Test used " + time + " ms");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
new file mode 100644
index 0000000..d679135
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+
+/**
+ * A embedded application test without creating Streaming Application
+ */
+public class LocalTestWithoutStreamApplication
+{
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testNonStreamApplicationWordcount() throws Exception
+  {
+
+    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    collector.id = "testNonStreamApplicationWordcount";
+    final Map<Object, Integer> expected = new HashMap<>();
+    expected.put("error", 2);
+    expected.put("word1", 4);
+    expected.put("word2", 8);
+    expected.put("word3", 4);
+    expected.put("word4", 4);
+    expected.put("word5", 4);
+    expected.put("word7", 4);
+    expected.put("word9", 6);
+
+    Callable<Boolean> exitCondition = new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+        return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+      }
+    };
+
+
+    StreamFactory.fromFolder("./src/test/resources/data")
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split(" "));
+          }
+        })
+        .countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
+
+    List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+
+    Assert.assertNotNull(data);
+    Assert.assertTrue(data.size() > 1);
+    Assert.assertEquals(expected, data.get(data.size() - 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
new file mode 100644
index 0000000..4958a8e
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * An example to create your own stream
+ */
+public class MyStream<T> extends ApexStreamImpl<T>
+{
+
+  public MyStream(ApexStream<T> apexStream)
+  {
+    super(apexStream);
+  }
+
+  <O> MyStream<O> myFilterAndMap(Function.MapFunction<T, O> map, Function.FilterFunction<T> filterFunction)
+  {
+    return filter(filterFunction).map(map).with(DAG.Locality.THREAD_LOCAL);
+  }
+
+}