You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:21 UTC

[5/6] apex-malhar git commit: APEXMALHAR-2142 #comment Implement WindowedStream interface

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
index 791ce3e..eae8b15 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
@@ -27,8 +27,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.stream.api.Option;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.DAG;
@@ -36,14 +41,15 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 
 /**
- * Graph data structure for DAG
- * With this data structure, the framework can do lazy load and optimization
+ * Logical graph data structure for DAG <br>
+ *
+ * With the build method({@link #buildDAG()}, {@link #buildDAG(DAG)}) to convert it to Apex DAG
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class DagMeta
 {
-
   private List<NodeMeta> heads = new LinkedList<>();
 
   List<Pair<Attribute, Object>> dagAttributes = new LinkedList<>();
@@ -51,10 +57,10 @@ public class DagMeta
   public static class NodeMeta
   {
 
-    private String nodeName;
-
     private Operator operator;
 
+    private Option[] options;
+
     List<Pair<Attribute, Object>> operatorAttributes = new LinkedList<>();
 
     private Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> nodeStreams = new HashMap<>();
@@ -79,11 +85,6 @@ public class DagMeta
       return children;
     }
 
-    public String getNodeName()
-    {
-      return nodeName;
-    }
-
     public Operator getOperator()
     {
       return operator;
@@ -94,13 +95,13 @@ public class DagMeta
       return nodeStreams;
     }
 
-    public NodeMeta(Operator operator, String nodeName)
+    public NodeMeta(Operator operator, Option... options)
     {
 
-      this.nodeName = nodeName;
-
       this.operator = operator;
 
+      this.options = options;
+
       for (Field field : this.operator.getClass().getFields()) {
         int modifier = field.getModifiers();
         if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
@@ -122,6 +123,15 @@ public class DagMeta
       }
     }
 
+    public String getOperatorName()
+    {
+      for (Option opt : options) {
+        if (opt instanceof Option.OpName) {
+          return ((Option.OpName)opt).getName();
+        }
+      }
+      return operator.toString();
+    }
   }
 
   public DagMeta()
@@ -141,11 +151,15 @@ public class DagMeta
     for (NodeMeta nm : heads) {
       visitNode(nm, dag);
     }
+    logger.debug("Finish building the dag:\n {}", dag.toString());
   }
 
   private void visitNode(NodeMeta nm, DAG dag)
   {
-    dag.addOperator(nm.nodeName, nm.operator);
+    String opName = nm.getOperatorName();
+    logger.debug("Building DAG: add operator {}: {}", opName, nm.operator);
+    dag.addOperator(opName, nm.operator);
+
     for (NodeMeta child : nm.children) {
       visitNode(child, dag);
     }
@@ -154,15 +168,18 @@ public class DagMeta
       if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) {
         continue;
       }
+      logger.debug("Building DAG: add stream {} from {} to {}", entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
       DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(),
           entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
       // set locality
       if (entry.getValue().getRight() != null) {
+        logger.debug("Building DAG: set locality of the stream {} to {}", entry.getKey().toString(), entry.getValue().getRight());
         streamMeta.setLocality(entry.getValue().getRight());
       }
       //set attributes for output port
       if (nm.outputPortAttributes.containsKey(entry.getKey())) {
         for (Pair<Attribute, Object> attr : nm.outputPortAttributes.get(entry.getKey())) {
+          logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), entry.getKey());
           dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue());
         }
       }
@@ -173,6 +190,7 @@ public class DagMeta
       //set input port attributes
       if (nm.inputPortAttributes.containsKey(input)) {
         for (Pair<Attribute, Object> attr : nm.inputPortAttributes.get(input)) {
+          logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), input);
           dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue());
         }
       }
@@ -180,15 +198,16 @@ public class DagMeta
 
     // set operator attributes
     for (Pair<Attribute, Object> attr : nm.operatorAttributes) {
+      logger.debug("Building DAG: set operator attribute {} to {} for operator {}", attr.getLeft(), attr.getValue(), nm.operator);
       dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue());
     }
 
   }
 
-  public NodeMeta addNode(String nodeName, Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort)
+  public NodeMeta addNode(Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort, Option... options)
   {
 
-    NodeMeta newNode = new NodeMeta(operator, nodeName);
+    NodeMeta newNode = new NodeMeta(operator, options);
     if (parent == null) {
       heads.add(newNode);
     } else {
@@ -199,4 +218,6 @@ public class DagMeta
     return newNode;
   }
 
+  private static final Logger logger = LoggerFactory.getLogger(DagMeta.class);
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
index 982980c..b5bc286 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
@@ -20,6 +20,8 @@ package org.apache.apex.malhar.stream.api.impl;
 
 import java.util.UUID;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 import com.datatorrent.api.Operator;
 
 import static java.lang.System.currentTimeMillis;
@@ -29,6 +31,7 @@ import static java.lang.System.currentTimeMillis;
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class IDGenerator
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
index 7af6ece..d6201ad 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
@@ -18,39 +18,59 @@
  */
 package org.apache.apex.malhar.stream.api.impl;
 
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.PartitionStrategy;
 import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
 import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
 
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
 /**
- * A Factory class to build from different kind of input source
+ * A Factory class to build stream from different input sources
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class StreamFactory
 {
-  public static ApexStream<String> fromFolder(String inputOperatorName, String folderName)
+  /**
+   * Create a stream of string tuples from reading files in hdfs folder line by line
+   * @param folderName
+   * @param opts
+   * @return
+   */
+  public static ApexStream<String> fromFolder(String folderName, Option... opts)
   {
     LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator();
     fileLineInputOperator.setDirectory(folderName);
     ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
-    return newStream.addOperator(inputOperatorName, fileLineInputOperator, null, fileLineInputOperator.output);
+    return newStream.addOperator(fileLineInputOperator, null, fileLineInputOperator.output, opts);
   }
 
   public static ApexStream<String> fromFolder(String folderName)
   {
-    return fromFolder("FolderScanner", folderName);
+    return fromFolder(folderName, name("FolderScanner"));
   }
 
   public static ApexStream<String> fromKafka08(String zookeepers, String topic)
   {
-    return fromKafka08("Kafka08Input", zookeepers, topic);
+    return fromKafka08(zookeepers, topic, name("Kafka08Input"));
   }
 
-  public static ApexStream<String> fromKafka08(String inputName, String zookeepers, String topic)
+  /**
+   * Create a stream of string reading input from kafka 0.8
+   * @param zookeepers
+   * @param topic
+   * @param opts
+   * @return
+   */
+  public static ApexStream<String> fromKafka08(String zookeepers, String topic, Option... opts)
   {
     KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
     kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic);
@@ -59,25 +79,49 @@ public class StreamFactory
     return newStream.addOperator(kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort);
   }
 
-  public static <T> ApexStream<T> fromInput(String inputOperatorName, InputOperator operator, Operator.OutputPort<T> outputPort)
+  /**
+   * Create a stream with any input operator
+   * @param operator
+   * @param outputPort
+   * @param opts
+   * @param <T>
+   * @return
+   */
+  public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort, Option... opts)
   {
     ApexStreamImpl<T> newStream = new ApexStreamImpl<>();
-    return newStream.addOperator(inputOperatorName, operator, null, outputPort);
+    return newStream.addOperator(operator, null, outputPort, opts);
   }
 
-  public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort)
+  /**
+   * Create stream of byte array messages from kafka 0.9
+   * @param brokers
+   * @param topic
+   * @param opts
+   * @return
+   */
+  public static ApexStream<byte[]> fromKafka09(String brokers, String topic, Option... opts)
   {
-    return fromInput(operator.toString(), operator, outputPort);
+    KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+    kafkaInput.setClusters(brokers);
+    kafkaInput.setTopics(topic);
+    ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+    return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts);
   }
 
-  public static ApexStream<String> fromKafka09(String name, String brokers, String topic)
+  /**
+   * Create stream of byte array messages from kafka 0.9 with more partition options
+   */
+  public static ApexStream<byte[]> fromKafka09(String brokers, String topic, PartitionStrategy strategy, int partitionNumber, Option... opts)
   {
-    throw new UnsupportedOperationException();
+    KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+    kafkaInput.setClusters(brokers);
+    kafkaInput.setTopics(topic);
+    kafkaInput.setStrategy(strategy.name());
+    kafkaInput.setInitialPartitionCount(partitionNumber);
+    ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+    return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts);
   }
 
-  public static ApexStream<String> fromKafka09(String brokers, String topic)
-  {
-    return fromKafka09("KafkaInput", brokers, topic);
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
new file mode 100644
index 0000000..1bc500d
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+/**
+ * A wrapper operator that intercept the tuples and convert them between {@link Tuple}
+ */
+public class TupleWrapperOperator implements InputOperator, Operator.CheckpointNotificationListener
+{
+
+  public static class OutputPortWrapper extends DefaultOutputPort implements Sink
+  {
+
+    @Override
+    public void put(Object o)
+    {
+      emit(o);
+    }
+
+    @Override
+    public int getCount(boolean b)
+    {
+      // No Accumulation
+      return 0;
+    }
+  }
+
+  public static class InputPortWrapper extends DefaultInputPort<Tuple>
+  {
+
+    @NotNull
+    private DefaultInputPort input;
+
+    public void setInput(DefaultInputPort input)
+    {
+      this.input = input;
+    }
+
+    @Override
+    public void process(Tuple o)
+    {
+      input.process(o.getValue());
+    }
+
+    @Override
+    public Sink getSink()
+    {
+      return input.getSink();
+    }
+
+    @Override
+    public void setConnected(boolean connected)
+    {
+      input.setConnected(connected);
+    }
+
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      input.setup(context);
+    }
+
+    @Override
+    public void teardown()
+    {
+      input.teardown();
+    }
+  }
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient OutputPortWrapper output1 = new OutputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient OutputPortWrapper output2 = new OutputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient OutputPortWrapper output3 = new OutputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient OutputPortWrapper output4 = new OutputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient OutputPortWrapper output5 = new OutputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient InputPortWrapper input1 = new InputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient InputPortWrapper input2 = new InputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient InputPortWrapper input3 = new InputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient InputPortWrapper input4 = new InputPortWrapper();
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient InputPortWrapper input5 = new InputPortWrapper();
+
+  //delegate to
+  @NotNull
+  private Operator operator;
+
+  public void setOperator(Operator operator)
+  {
+    this.operator = operator;
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+    operator.beginWindow(l);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    operator.endWindow();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    operator.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    operator.teardown();
+  }
+
+  @Override
+  public void beforeCheckpoint(long l)
+  {
+    if (operator instanceof CheckpointNotificationListener) {
+      ((CheckpointNotificationListener)operator).beforeCheckpoint(l);
+    }
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+    if (operator instanceof CheckpointNotificationListener) {
+      ((CheckpointNotificationListener)operator).checkpointed(l);
+    }
+  }
+
+  @Override
+  public void committed(long l)
+  {
+    if (operator instanceof CheckpointNotificationListener) {
+      ((CheckpointNotificationListener)operator).committed(l);
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (operator instanceof InputOperator) {
+      ((InputOperator)operator).emitTuples();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
new file mode 100644
index 0000000..68f1b9e
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+  @Override
+  public MutableLong defaultAccumulatedValue()
+  {
+    return new MutableLong(0);
+  }
+
+  @Override
+  public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+  {
+    accumulatedValue.add(input);
+    return accumulatedValue;
+  }
+
+  @Override
+  public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+  {
+    accumulatedValue1.add(accumulatedValue2);
+    return accumulatedValue1;
+  }
+
+  @Override
+  public Long getOutput(MutableLong accumulatedValue)
+  {
+    return accumulatedValue.getValue();
+  }
+
+  @Override
+  public Long getRetraction(Long value)
+  {
+    return -value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
new file mode 100644
index 0000000..3ab6892
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+  public FoldFn()
+  {
+  }
+
+  public FoldFn(OUTPUT initialVal)
+  {
+    this.initialVal = initialVal;
+  }
+
+  private OUTPUT initialVal;
+
+  @Override
+  public OUTPUT defaultAccumulatedValue()
+  {
+    return initialVal;
+  }
+
+  @Override
+  public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+  {
+    return fold(accumulatedValue, input);
+  }
+
+  @Override
+  public OUTPUT getOutput(OUTPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public OUTPUT getRetraction(OUTPUT value)
+  {
+    return null;
+  }
+
+  abstract OUTPUT fold(OUTPUT result, INPUT input);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
new file mode 100644
index 0000000..b4507bc
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+  @Override
+  public INPUT defaultAccumulatedValue()
+  {
+    return null;
+  }
+
+  @Override
+  public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+  {
+    if (accumulatedValue == null) {
+      return input;
+    }
+    return reduce(accumulatedValue, input);
+  }
+
+  @Override
+  public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+  {
+    return reduce(accumulatedValue1, accumulatedValue2);
+  }
+
+  @Override
+  public INPUT getOutput(INPUT accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public INPUT getRetraction(INPUT value)
+  {
+    return null;
+  }
+
+  public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
new file mode 100644
index 0000000..77a08a6
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+
+  int n;
+
+  Comparator<T> comparator;
+
+  public void setN(int n)
+  {
+    this.n = n;
+  }
+
+  public void setComparator(Comparator<T> comparator)
+  {
+    this.comparator = comparator;
+  }
+
+  @Override
+  public List<T> defaultAccumulatedValue()
+  {
+    return new LinkedList<>();
+  }
+
+  @Override
+  public List<T> accumulate(List<T> accumulatedValue, T input)
+  {
+    int k = 0;
+    for (T inMemory : accumulatedValue) {
+      if (comparator != null) {
+        if (comparator.compare(inMemory, input) < 0) {
+          break;
+        }
+      } else if (input instanceof Comparable) {
+        if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+          break;
+        }
+      } else {
+        throw new RuntimeException("Tuple cannot be compared");
+      }
+      k++;
+    }
+    accumulatedValue.add(k, input);
+    if (accumulatedValue.size() > n) {
+      accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+    }
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+  {
+    accumulatedValue1.addAll(accumulatedValue2);
+    if (comparator != null) {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+    } else {
+      Collections.sort(accumulatedValue1, Collections.reverseOrder());
+    }
+    if (accumulatedValue1.size() > n) {
+      return accumulatedValue1.subList(0, n);
+    } else {
+      return accumulatedValue1;
+    }
+  }
+
+  @Override
+  public List<T> getOutput(List<T> accumulatedValue)
+  {
+    return accumulatedValue;
+  }
+
+  @Override
+  public List<T> getRetraction(List<T> accumulatedValue)
+  {
+    return new LinkedList<>();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
index cc85f37..b0fe3c5 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
@@ -18,6 +18,8 @@
  */
 package org.apache.apex.malhar.stream.api.operator;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
@@ -33,6 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class AnnonymousClassModifier extends ClassVisitor
 {
   private String className;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
index 9194dc2..05a791c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
@@ -21,9 +21,12 @@ package org.apache.apex.malhar.stream.api.operator;
 
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class ByteArrayClassLoader extends ClassLoader
 {
   private final Map<String, byte[]> classes;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
index 0a8ba55..1e2066c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
@@ -26,10 +26,11 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.validation.constraints.NotNull;
-
+import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.util.TupleUtil;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
@@ -39,12 +40,15 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 
 /**
  * Operators that wrap the functions
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
 {
   private byte[] annonymousFunctionClass;
@@ -57,8 +61,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
 
   protected boolean isAnnonymous = false;
 
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
 
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>();
+
   public FunctionOperator(FUNCTION f)
   {
     isAnnonymous = f.getClass().isAnonymousClass();
@@ -245,6 +253,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
 
     }
 
+    @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
     {
       @Override
@@ -255,6 +264,21 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
       }
     };
 
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
+    {
+      @Override
+      public void process(Tuple<IN> t)
+      {
+        Function.MapFunction<IN, OUT> f = getFunction();
+        if (t instanceof Tuple.PlainTuple) {
+          TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue()));
+        } else {
+          output.emit(f.f(t.getValue()));
+        }
+      }
+    };
+
     public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
     {
       super(f);
@@ -269,6 +293,8 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
 
     }
 
+
+    @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
     {
       @Override
@@ -281,93 +307,65 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
       }
     };
 
-    public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
-    {
-      super(f);
-    }
-  }
-
-  public static class FoldFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FoldFunction<IN, OUT>>
-  {
-
-    public FoldFunctionOperator()
-    {
-
-    }
-
-    @NotNull
-    private OUT foldVal;
-
-    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
     {
       @Override
-      public void process(IN t)
+      public void process(Tuple<IN> t)
       {
-        Function.FoldFunction<IN, OUT> f = getFunction();
-        // fold the value
-        foldVal = f.fold(t, foldVal);
-        output.emit(foldVal);
+        Function.FlatMapFunction<IN, OUT> f = getFunction();
+        if (t instanceof Tuple.PlainTuple) {
+          for (OUT out : f.f(t.getValue())) {
+            tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out));
+          }
+        } else {
+          for (OUT out : f.f(t.getValue())) {
+            output.emit(out);
+          }
+        }
       }
     };
 
-    public FoldFunctionOperator(Function.FoldFunction<IN, OUT> f, OUT initialVal)
+    public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
     {
       super(f);
-      this.foldVal = initialVal;
     }
   }
 
-  public static class ReduceFunctionOperator<IN> extends FunctionOperator<IN, Function.ReduceFunction<IN>>
+
+  public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
   {
 
-    public ReduceFunctionOperator()
+    public FilterFunctionOperator()
     {
 
     }
 
-    @NotNull
-    private IN reducedVal;
-
+    @InputPortFieldAnnotation(optional = true)
     public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
     {
       @Override
       public void process(IN t)
       {
-        Function.ReduceFunction<IN> f = getFunction();
+        Function.FilterFunction<IN> f = getFunction();
         // fold the value
-        if (reducedVal == null) {
-          reducedVal = t;
-          return;
+        if (f.f(t)) {
+          output.emit(t);
         }
-        reducedVal = f.reduce(t, reducedVal);
-        output.emit(reducedVal);
       }
     };
 
-    public ReduceFunctionOperator(Function.ReduceFunction<IN> f)
-    {
-      super(f);
-    }
-  }
-
-  public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
-  {
-
-    public FilterFunctionOperator()
-    {
-
-    }
-
-    public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Tuple<IN>> tupleInput =  new DefaultInputPort<Tuple<IN>>()
     {
       @Override
-      public void process(IN t)
+      public void process(Tuple<IN> t)
       {
         Function.FilterFunction<IN> f = getFunction();
-        // fold the value
-        if (f.f(t)) {
-          output.emit(t);
+        if (f.f(t.getValue())) {
+          tupleOutput.emit(t);
         }
+
       }
     };
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
deleted file mode 100644
index 3641189..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.util;
-
-/**
- * An interface indicate a tuple with a specific key
- * It is used internally to identify the key from the tuple
- *
- * @since 3.4.0
- */
-public interface KeyedTuple<K>
-{
-  /**
-   * Return the key of the tuple
-   * @return
-   */
-  K getKey();
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
index 583615a..04f42b3 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -18,6 +18,11 @@
  */
 package org.apache.apex.malhar.stream.api.util;
 
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+
 /**
  * The tuple util will be used to extract fields that are used as key or value<br>
  * Or converting from data tuples to display tuples <br>
@@ -29,8 +34,22 @@ package org.apache.apex.malhar.stream.api.util;
 public class TupleUtil
 {
 
-  public static interface NONE
+  public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue)
   {
 
+    if (t instanceof Tuple.WindowedTuple) {
+      Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>();
+      List<Window> wins = ((Tuple.WindowedTuple)t).getWindows();
+      for (Window w : wins) {
+        newT.addWindow(w);
+      }
+      newT.setValue(newValue);
+      ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp());
+      return newT;
+    } else if (t instanceof Tuple.TimestampedTuple) {
+      return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
+    } else {
+      return new Tuple.PlainTuple<>(newValue);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
index 34820b6..9d03f2a 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
@@ -269,7 +269,7 @@ public class FunctionOperatorTest
         = new FunctionOperator.FilterFunctionOperator<Integer>(new Function.FilterFunction<Integer>()
         {
           @Override
-          public Boolean f(Integer in)
+          public boolean f(Integer in)
           {
             return in % divider == 0;
           }
@@ -309,7 +309,7 @@ public class FunctionOperatorTest
         .filter(new Function.FilterFunction<Integer>()
         {
           @Override
-          public Boolean f(Integer in)
+          public boolean f(Integer in)
           {
             return in % divider == 0;
           }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
index 71b9a82..99d5ca6 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -33,6 +33,8 @@ import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
 /**
  * Unit test to default implementation of ApexStream interface
  */
@@ -45,8 +47,8 @@ public class ApexStreamImplTest
     LogicalPlan dag = new LogicalPlan();
     TestOperator<String, Integer> firstOperator = new TestOperator<>();
     TestOperator<Integer, Date> secondOperator = new TestOperator<>();
-    new ApexStreamImpl<String>().addOperator("first", firstOperator, null, firstOperator.output)
-        .addOperator("second", secondOperator, secondOperator.input, null)
+    new ApexStreamImpl<String>().addOperator(firstOperator, null, firstOperator.output, name("first"))
+        .endWith(secondOperator, secondOperator.input, name("second"))
         .with(DAG.Locality.THREAD_LOCAL)
         .with(Context.OperatorContext.AUTO_RECORD, true)
         .with("prop", "TestProp").populateDag(dag);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
index 44f76b1..f65806e 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -20,6 +20,11 @@ package org.apache.apex.malhar.stream.sample;
 
 import java.util.Arrays;
 
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
@@ -28,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * An application example with stream api
@@ -40,18 +46,27 @@ public class ApplicationWithStreamAPI implements StreamingApplication
   public void populateDAG(DAG dag, Configuration configuration)
   {
     String localFolder = "./src/test/resources/data";
-    ApexStream stream = StreamFactory
+    ApexStream<String> stream = StreamFactory
         .fromFolder(localFolder)
         .flatMap(new Function.FlatMapFunction<String, String>()
         {
           @Override
           public Iterable<String> f(String input)
           {
-            return Arrays.asList(input.split(" "));
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
           }
         });
     stream.print();
-    stream.countByKey().print();
+    stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration
+        .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+          }
+        }).print();
     stream.populateDag(dag);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
index d679135..f46fb14 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
@@ -27,9 +27,13 @@ import java.util.concurrent.Callable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * A embedded application test without creating Streaming Application
@@ -41,25 +45,32 @@ public class LocalTestWithoutStreamApplication
   public void testNonStreamApplicationWordcount() throws Exception
   {
 
-    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
     collector.id = "testNonStreamApplicationWordcount";
-    final Map<Object, Integer> expected = new HashMap<>();
-    expected.put("error", 2);
-    expected.put("word1", 4);
-    expected.put("word2", 8);
-    expected.put("word3", 4);
-    expected.put("word4", 4);
-    expected.put("word5", 4);
-    expected.put("word7", 4);
-    expected.put("word9", 6);
+    final Map<String, Long> expected = new HashMap<>();
+    expected.put("error", 2L);
+    expected.put("word1", 4L);
+    expected.put("word2", 8L);
+    expected.put("word3", 4L);
+    expected.put("word4", 4L);
+    expected.put("word5", 4L);
+    expected.put("word7", 4L);
+    expected.put("word9", 6L);
 
     Callable<Boolean> exitCondition = new Callable<Boolean>()
     {
       @Override
       public Boolean call() throws Exception
       {
-        List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
-        return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+        if (!TupleCollector.results.containsKey("testNonStreamApplicationWordcount") || TupleCollector.results.get("testNonStreamApplicationWordcount").isEmpty()) {
+          return false;
+        }
+        Map<String, Long> data = new HashMap<>();
+        for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry :
+            (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) {
+          data.put(entry.getValue().getKey(), entry.getValue().getValue());
+        }
+        return data.size() >= 8 && expected.equals(data);
       }
     };
 
@@ -73,13 +84,26 @@ public class LocalTestWithoutStreamApplication
             return Arrays.asList(input.split(" "));
           }
         })
-        .countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+        .countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+          }
+        }).addOperator(collector, collector.inputPort, null).runEmbedded(false, 30000, exitCondition);
 
+    Map<String, Long> data = new HashMap<>();
 
-    List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+    for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry :
+        (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) {
+      data.put(entry.getValue().getKey(), entry.getValue().getValue());
+    }
 
+    //Thread.sleep(100000);
     Assert.assertNotNull(data);
     Assert.assertTrue(data.size() > 1);
-    Assert.assertEquals(expected, data.get(data.size() - 1));
+    Assert.assertEquals(expected, data);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
index 4958a8e..20d7aed 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.stream.sample;
 
-import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
 
@@ -30,7 +29,7 @@ import com.datatorrent.api.DAG;
 public class MyStream<T> extends ApexStreamImpl<T>
 {
 
-  public MyStream(ApexStream<T> apexStream)
+  public MyStream(ApexStreamImpl<T> apexStream)
   {
     super(apexStream);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
index b2d1e8b..5e48974 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
@@ -24,37 +24,51 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
 import org.apache.apex.malhar.stream.api.impl.StreamFactory;
 
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * A test class which test your own stream implementation build on default one
  */
 @SuppressWarnings("unchecked")
 public class MyStreamTest
 {
-  static Map<Object, Integer> expected = new HashMap<>();
+  static Map<String, Long> expected = new HashMap<>();
   static String testId = null;
   static Callable<Boolean> exitCondition = null;
   static {
-    expected.put("newword1", 4);
-    expected.put("newword2", 8);
-    expected.put("newword3", 4);
-    expected.put("newword4", 4);
-    expected.put("newword5", 4);
-    expected.put("newword7", 4);
-    expected.put("newword9", 6);
+    expected.put("newword1", 4L);
+    expected.put("newword2", 8L);
+    expected.put("newword3", 4L);
+    expected.put("newword4", 4L);
+    expected.put("newword5", 4L);
+    expected.put("newword7", 4L);
+    expected.put("newword9", 6L);
 
     exitCondition = new Callable<Boolean>()
     {
       @Override
       public Boolean call() throws Exception
       {
-        List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
-        return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+        if (!TupleCollector.results.containsKey(testId) || TupleCollector.results.get(testId).isEmpty()) {
+          return false;
+        }
+        Map<String, Long> dataMap = new HashMap<>();
+        List<Tuple.TimestampedTuple<KeyValPair<String, Long>>> data = (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId);
+        for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : data) {
+          dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+        }
+        return (dataMap != null) && dataMap.size() >= 1 && expected.equals(dataMap);
       }
     };
   }
@@ -65,9 +79,9 @@ public class MyStreamTest
 
     testId = "testMethodChainWordcount";
 
-    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
     collector.id = testId;
-    new MyStream<>(StreamFactory.fromFolder("./src/test/resources/data"))
+    new MyStream<>((ApexStreamImpl<String>)StreamFactory.fromFolder("./src/test/resources/data"))
         .<String, MyStream<String>>flatMap(new Function.FlatMapFunction<String, String>()
         {
           @Override
@@ -85,17 +99,28 @@ public class MyStreamTest
         }, new Function.FilterFunction<String>()
         {
           @Override
-          public Boolean f(String input)
+          public boolean f(String input)
           {
             return input.startsWith("word");
           }
-        }).countByKey()
-        .addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+        }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000)))
+        .countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+          }
+        }).addOperator(collector, collector.inputPort, null)
+        .runEmbedded(false, 30000, exitCondition);
 
+    Map<String, Long> dataMap = new HashMap<>();
+    for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) {
+      dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+    }
 
-    List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
-    Assert.assertTrue(data.size() > 1);
-    Assert.assertEquals(expected, data.get(data.size() - 1));
+    Assert.assertTrue(dataMap.size() > 1);
+    Assert.assertEquals(expected, dataMap);
   }
 
   @Test
@@ -103,9 +128,9 @@ public class MyStreamTest
   {
     testId = "testNonMethodChainWordcount";
 
-    TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+    TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
     collector.id = testId;
-    MyStream<String> mystream = new MyStream<>(StreamFactory
+    MyStream<String> mystream = new MyStream<>((ApexStreamImpl<String>)StreamFactory
         .fromFolder("./src/test/resources/data"))
         .flatMap(new Function.FlatMapFunction<String, String>()
         {
@@ -125,16 +150,28 @@ public class MyStreamTest
     }, new Function.FilterFunction<String>()
     {
       @Override
-      public Boolean f(String input)
+      public boolean f(String input)
       {
         return input.startsWith("word");
       }
-    }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+    }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000)))
+    .countByKey(new Function.ToKeyValue<String, String, Long>()
+    {
+      @Override
+      public Tuple<KeyValPair<String, Long>> f(String input)
+      {
+        return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+      }
+    }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
 
+    Map<String, Long> dataMap = new HashMap<>();
+    for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) {
+      dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+    }
 
-    List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
-    Assert.assertTrue(data.size() > 1);
-    Assert.assertEquals(expected, data.get(data.size() - 1));
+    Assert.assertTrue(dataMap.size() > 1);
+    Assert.assertEquals(expected, dataMap);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
index a5c644d..94667c9 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 
 /**
@@ -37,6 +38,7 @@ public class TupleCollector<T> extends BaseOperator
 
   public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>(this);
 
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();
 
   public String id = "";

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
new file mode 100644
index 0000000..14ff066
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class WCInput extends BaseOperator implements InputOperator
+{
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+  private transient BufferedReader reader;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    initReader();
+  }
+
+  private void initReader()
+  {
+    try {
+      Path myPath = new Path("/user/siyuan/wc/wordcount");
+      FileSystem fs = FileSystem.get(new Configuration());
+      reader = new BufferedReader(new InputStreamReader(fs.open(myPath)));
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    IOUtils.closeQuietly(reader);
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    try {
+      String line = reader.readLine();
+      if (line == null) {
+        reader.close();
+        initReader();
+      } else {
+        // simulate late data
+        //long timestamp = System.currentTimeMillis() - (long)(Math.random() * 30000);
+
+        this.output.emit(line);
+      }
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    //this.controlOutput.emit(new WatermarkImpl(System.currentTimeMillis() - 15000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
new file mode 100644
index 0000000..11dabe4
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Word count with streaming API
+ */
+@ApplicationAnnotation(name = "WCDemo")
+public class WordCountWithStreamAPI implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    WCInput wcInput = new WCInput();
+    ApexStream<String> stream = StreamFactory
+        .fromInput(wcInput, wcInput.output)
+        .flatMap(new Function.FlatMapFunction<String, String>()
+        {
+          @Override
+          public Iterable<String> f(String input)
+          {
+            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+          }
+        });
+    stream.print();
+    stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration
+        .millis(1000)).accumulatingFiredPanes())
+        .countByKey(new Function.ToKeyValue<String, String, Long>()
+        {
+          @Override
+          public Tuple<KeyValPair<String, Long>> f(String input)
+          {
+            return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+          }
+        }).print();
+    stream.populateDag(dag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/resources/sampletweets.txt
----------------------------------------------------------------------
diff --git a/stream/src/test/resources/sampletweets.txt b/stream/src/test/resources/sampletweets.txt
new file mode 100644
index 0000000..379d424
--- /dev/null
+++ b/stream/src/test/resources/sampletweets.txt
@@ -0,0 +1,207 @@
+Tweet content
+"Apple has $233 billion in cash. It could buy all
+
+\u2014@NFL teams
+\u2014@NBA teams
+\u2014@MLB teams
+\u2014@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Read $MRNJ #NEWS, $HEMP &amp; $GRCU r above .01, that's where we r goin\U0001f680 $SPY $MSFT $SBUX $SFOR $VRX $AAPL $TSLA $GOOG $FB $EURUSD $USDJPY $MLCG
+$RXSF is leadin their sector accordin 2 @EdisonMediaCen $AAPL $SPY $TSLA $FB $EURUSD $ALK $IBB $EW $AMZN $GBPUSD $GM https://t.co/LYY2mHn755
+Philstockworld Top Trade Review $AAPL $MSFT #Dividends $USO $HOV $TWTR -- https://t.co/JArXsIm7CI https://t.co/kRR9ezhm9E
+Philstockworld Top Trade Review: $AAPL $ABX $BA $CAKE $CMG $DIS $IBM $GILD $LL $UNG $SPY -- https://t.co/EX5SYjdwBC https://t.co/7FBZwVZ63v
+"Monday\u2019s Oil Mess: Rent-A-Rebel Jacks up Prices into the Holiday $USO $AAPL 
+#Earnings -- https://t.co/cGHB3WDKA8 https://t.co/JFZIBcom1n"
+Meaningless Monday Market Movement! $AAPL $SQQQ #oil #Brexit https://t.co/j4Iqg7E1HN
+"S&amp;P Futures Back over 2,050, for Now
+$SPY $AAPL $SQQQ #China #Debt #Hedging -- https://t.co/2dOc5T89S3 https://t.co/TDPVdNRNQF"
+"\U0001f4a5TURN YOUR $500 INTO $5,000+\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE\u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $GSAT $MGT 
+https://t.co/lwAGjfmIP3"
+Trendless Tuesday - Watch Yesterday\u2019s Fake Gains Disappear $AAPL #China $FXI #Earnings -- https://t.co/GpgGqoOlFn https://t.co/FRuixv5aZF
+"\U0001f4a5TURN YOUR $500 INTO $5,000+\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE\u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $UVXY $JDST
+https://t.co/lwAGjfmIP3"
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla 
+Twitter 
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+Option Opportunity Portfolio May Review \u2013 Up 19.3% In 30 Days! $ABX $FCX $USO $AAPL $DIS - https://t.co/rp3kMsRZ3E https://t.co/TKkc15pKcR
+Waiting for the Fed \u2013 Apple Gives Us Huge Wins: $AAPL $SQQQ #GDP #Nikkei #Futures #Oil -- https://t.co/Al3pkf350V https://t.co/LktIRF4F2b
+Tempting Tuesday - S&amp;P 2,100 is Still the Line to Watch Ahead of the Fed $AAPL $QQQ -- https://t.co/t1eDfKHJnk https://t.co/BAW3RAe7SC
+Our $SQQQ Hedge is Up 314% and Our Futures Are Up $4,850, You're Welcome!  $AAPL -- https://t.co/eUQ2kCkCOY https://t.co/Yk98oyqMZl
+"TURN YOUR \U0001f4b2500 INTO \U0001f4b25,000$\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE \u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TWTR $AAPL $LNKD
+https://t.co/euJFNQX1g4"
+"TURN YOUR \U0001f4b2500 INTO \U0001f4b25,000$\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE \u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TALK $PPPI $AAPL https://t.co/oSn11kxftM"
+Bears today. We getting paid! $AAPL $TWTR $BWLD $NFLX https://t.co/CCi0S3skJJ
+"Apple has $233 billion in cash. It could buy all
+
+\u2014@NFL teams
+\u2014@NBA teams
+\u2014@MLB teams
+\u2014@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Are you in Sync with the market? https://t.co/ZtHHCrSAf8 #stocks #finance #investing #trading $AAPL $LNKD $NFLX  $GOOGL $FB
+The Last Time These Insiders Purchased This Stock It Sky Rocketed 1000%+ https://t.co/bmNAHBoQBD $DIA $QQQ $SPY $GOOG $AAPL $BAC $TWTR $FB
+"This Hacker Made Amazon\u2019s Alexa, Google Now, and Siri Command One Another
+https://t.co/YXP3yqmf4H $AAPL $AMZN $GOOG https://t.co/NG7r6qgfRt"
+"Over the last 3 years, the top 14 automakers upped their combined R&amp;D spend by $192 million. 
+
+$AAPL upped R&amp;D spend by $5 billion. 
+
+- MS"
+Volatility can be your friend. https://t.co/aHz2r8HHD2 #stocks #trading #investing #financials #learntodaytrade $FB $AAPL $LNKD $NFLX
+"PERCENTAGE of Apple's Revenues:
+FY 2006: 
+iPod 40%
+Mac 38%
+Services 10%
+Others 12%
+
+FY 2015:
+iPhone 66%
+Mac 11%
+iPad 10%
+Others 13%
+
+$AAPL"
+Apple recovered $40 million worth of gold from recycled iPhones, iPads &amp; Macs in 2015. https://t.co/XPBWlM6cBs $AAPL https://t.co/P0LMSRw7Ot
+"Apple's iPhone sales sink for 1st time ever last quarter
+https://t.co/TAKjUwl4Yc  @DavidGoldmanCNN @cnntech  $AAPL https://t.co/OrDp4BDpsD"
+$BAC is down 5% since our article was posted on Friday https://t.co/al8AgaSsiI $DIA $QQQ $SPY $AAPL $GOOG $FB $TWTR $STUDY $NFLX $LNKD $IBM
+Ben Franklin: The First Proponent Of Dividend Growth Investing? https://t.co/dx7FE2G9AH $AAPL $ACN $AL $BEN $CSV $HON $IJR $JNJ $JWN $PEGI
+$5,000 Friday the 13th - Yesterday's Futures Trades Pay Off Nicely $USO $SPY $AAPL  -- https://t.co/3RUEjAq1bO https://t.co/2L7cdebTlT
+I DON'T SEE ANY BUBBLE RIGHT NOW , I SWEAR ! $SPX $SPY $DIA $DJI $AAPL  $VIX $TVIX $C $BAC $GM $GE $FB #STOCKMARKET https://t.co/E5954RIpC7
+Terrible $AAPL quarter, finally. On the way to becoming $NOK. Tech is mean reverting, today's leaders are almost always tomorrow's laggards.
+The iPhone 7S could look radically different from the iPhones of today https://t.co/eQxUMAZ4eM $AAPL https://t.co/HIH3QqKpIC
+"No Bull: The Evidence
+https://t.co/Md2SNpjdwd
+$SPX $MSFT $GOOGL $AAPL $NFLX $AMZN $FB $DIS $V $BAC $GS $WMT $SBUX https://t.co/1oISHNX4cJ"
+The iPhone 7S could look radically different from the iPhones of today https://t.co/KgeVSjmcGe $AAPL https://t.co/7hFtg37oJu
+There was a 3rd Apple founder, Ronald Wayne, who sold his 10% stake for $800 in 1976. Today his share would've been worth $65 Billion. $AAPL
+Twitter Stock Set to Breakout Soon https://t.co/u4V6ChhpOW $TWTR $DIA $QQQ $SPY $AAPL $GLD $GDX $NUGT $DUST $BAC $GOOG $FB $STUDY $NFLX $IBM
+Alibaba Stock Price Breaks The 50 Day Moving Average https://t.co/ABOVWI6j2G $BABA $AAPL $YHOO $COST $UWTI $CSC $MON https://t.co/VlWGDxrQXh
+I still can\u2019t shake the feeling that $AAPL is slowly taking themselves private. https://t.co/XIAMvppDWh https://t.co/kdMGCGbMaJ
+$SPX ROADMAP 2016 #STOCKMARKET $INTC $F $SPY $AAPL $AMZN $C $VIX $FB $TWTR $GOOGL $UVXY $FAZ $FEZ $MSFT $GS $BAC $AA https://t.co/owuQ9awcDw
+"Want to know why $GOOG is so impressive and why $AAPL is so fucked? Read this years founders' letter from $GOOG:
+
+https://t.co/LiBjGZwyKw"
+"GET READY. Here are the companies reporting earnings next week: https://t.co/NXptPkQX70
+
+$AAPL $FB $TWTR $CMG $GILD https://t.co/tcIoCZdOZi"
+$SPX THIS TIME IT'S DIFFERENT! $SPY $DIA $SDOW $S  $FAZ $FEZ $AAPL $MSFT $BAC $C $JPM $GS $SIRI $AMZN $F $VIX $TVIX https://t.co/pkYVgNKv3P
+$SPX ROADMAP 2016 #STOCKS $TVIX $VXX $VALE $AAPL $AKS $FCX $MSFT $AA $MU $VIX $SPX $SPY #TRADING $PCLN $SIRI $ MCD https://t.co/6UH5He38h1
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/s8iQOvPQeR $AAPL https://t.co/QmQROtQ9vY
+11/ For example, buy an Echo and see your behavior change. The future is happening, and $AAPL seems, to me, asleep.
+$RLYP $SPY $KORS $WDAY $MSFT $AAPL $QLIK $TIVO $NXPI $CPXX $AVGO $ZOES $LE $TICC $SLB $FCEL $VRA $MLNX $ASNA $ICPT https://t.co/LXMpz4rFG0
+#STOCKMARKET GRAVITY LESSONS: what goes up  must come down $SPX $SPY $DIA $QQQ $TVIX $VIX $AAPL $C $FB $PCLN $BAC $F https://t.co/8HQHBEgSj5
+Should Icahn's exit or Buffett's entry affect your $AAPL judgment? The Big Name Effect. https://t.co/9Z2ok61MUh https://t.co/udAQLfdJFe
+Apple revenue drops 13 percent, ending 13 years of growth. Greater China was especially weak, down 26 percent. $AAPL https://t.co/q4ovXUenBU
+It was a $18 billion day for Apple. https://t.co/iRbGeoTmCJ $AAPL
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla 
+Twitter 
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+#3 TOP 2111.05 #STOCKS #STOCKMARKET #TRADING $SPX $SPY $VIX $TVIX $AAPL $SIRI $C $BAC $JPM $AMZN $MSFT $FB $TWTR $F https://t.co/gSqmN0fVON
+Google #IO16: Android's failure to innovate hands a Apple free run at WWDC $GOOG $AAPL https://t.co/FTs9M8JD5g https://t.co/20uou1gUkW
+$SPX 2134.72..2116.48...2111.05 HOUSTON WE HAVE A PROBLEM ! #STOCKMARKET $VIX $SPY $DIA $AAPL $C $BAC $FB $VXX $MSFT https://t.co/du3QfPUM4Q
+top #earnings $FB $AAPL $AMZN $TWTR $CMG $F $GILD $LNKD $FCX $CELG $SWKS $JBLU $T $NXPI $BA  https://t.co/lObOE0uRjZ https://t.co/94F6GJc3hE
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/ZVeQ9a4Yrh $AAPL https://t.co/2Ntpbxwlyo
+You do not want to miss this incredibly candid look into $AAPL w/ @tim_cook! Tune into @MadMoneyOnCNBC on @CNBC now! https://t.co/budv4qfvju
+Foxconn axes 60,000 jobs in one Chinese factory as robots take over: https://t.co/BnFdjGCmLf $AAPL https://t.co/WhRHer8jdN
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/nXmvK6PV7M https://t.co/MAcMz0iTg6
+Apple is about to report its worst quarter in 13 years on Tuesday https://t.co/NJ3hwunHCx $AAPL https://t.co/YLTmnpqNjI
+Everyone who wants an iPhone has one. $AAPL is now a consumer staple stock and will trade on replacement / shareholder yield.
+Financial Armageddon\u2019 is imminent, the next major crash will happen in 2016 $VXX $VIX $TVIX  $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+"Apple is NO longer the largest US stock by market cap. Google is: https://t.co/i81Y83jQJC
+
+$GOOGL $AAPL https://t.co/cRCKRYBICS"
+Exclusive: Apple hires former Tesla VP Chris Porritt for \u2018special [car] project\u2019 https://t.co/7knsloxvJW $TSLA $AAPL https://t.co/X8cYztExoP
+$SPX on the top of downtrend Channel Be careful! #STOCKMARKET $SPY $AAPL $AMZN $TSLA $FB $QQQ $DIA $NFLX $PCLN $C $F https://t.co/UKZCyLYuBq
+UPDATE: Apple CEO Cook says in conference call that smartphone marker is 'currently not growing' $AAPL https://t.co/WeECmrdv1j
+In February Charlie Munger was asked why Berkshire owns $GM. The $AAPL stake isn't anymore complicated than this: https://t.co/Rwkb30OEgq
+Talking to @SquawkStreet about $AAPL &amp; more at @NYSE ! https://t.co/m05b68VLMp
+iPhone sales sour #Apple's earning: https://t.co/962fj9SWsc $AAPL https://t.co/nz9FRK6sNK
+People aren\u2019t upgrading smartphones as quickly and that is bad for Apple https://t.co/EOEJPfNR8Z \U0001f513 $AAPL
+"$NXPI $JBLU $FCX $AAPL $CMG
+$TWTR $EBAY $BWLD $PNRA $CRUS
+$FB $FSLR $UPS $CELG $AMZN
+$LNKD $BIDU $SWKS $GILD $HELE https://t.co/rQUmhHgYn0"
+People mad that Icahn sold $AAPL without giving them the head\u2019s up - How much in commissions did you pay him this year?
+Cool stat: $AAPL's $46 billion loss in market cap overnight is greater than the market cap of 391 S&amp;P 500 companies https://t.co/1ms1YZzTbP
+Apple. You've come a long way... https://t.co/WGvk8K8MYv $AAPL https://t.co/3Wo0hAwRAc
+"Someone is building the Internet's biggest list of stock market cliches and it's amazing: https://t.co/mIV169cF36
+
+$SPY $AAPL $EURUSD"
+JUST IN: Apple delays earnings release by one day, to April 26th after the bell. \u2022 $AAPL
+Apple's market value is down nearly three Twitters $AAPL $TWTR
+Trump warns of a tech bubble: https://t.co/6Ks1yTa4Zc $AAPL $FB $AMZN He's 100% right about this. https://t.co/dJgTLk5JOB
+Apple could sell its billionth iPhone in just a few months' time https://t.co/g6VYDFIE3d $AAPL https://t.co/jzucmxDYXe
+$SPX  KEEP BLOWING #STOCKMARKET #BUBBLE #STOCKS $MSFT $GS $AAPL $SPY $DIA $DJI $C $SIRI $PCLN  $BAC $JPM $VIX $TVIX https://t.co/GPFBb0uCLF
+Will Apple $AAPL fall from tree? 12-mo descending triangle. I've no interest to short it, but it will be wild ride https://t.co/AnjsIKmIHI
+Tim Cook shouldn't be doing TV w/out a new product. Looks desperate. Not a consumer-facing guy. $AAPL https://t.co/Z4UFSimTLg
+When will Apple will sell its billionth iPhone? It may be sooner than you think: https://t.co/5IaF018N1p $AAPL https://t.co/cCIgtKqWHA
+#Stockmarket downtrend continues next week $spx  $spy $vix $tvix $dji $aapl $jpm $bac $c $msft $pcln $wmt $ba https://t.co/1TTlgnKnZc
+$AAPL https://t.co/AFANPYHnoq
+40 years ago this month, Apple co-founder Ronald Wayne sold his 10% stake in $AAPL for $800. Current value: $61 billion.
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/rXWwuyIooI https://t.co/TztgKCcWWy
+Apple's iBooks and iTunes Movies in China have been shut down after less than 7 months https://t.co/ZuGXZqSHma $AAPL https://t.co/1OHGC9YiUf
+Possible buy on $AAPL as it drops onto it's 9 DEMA support #1Broker #Bitcoin #Blockchain https://t.co/WWssD01joh https://t.co/jOKJyG9EaJ
+"Apple is down 7% after earnings.
+
+That's about $40 BILLION in market cap gone in 30 minutes. Poof.
+
+$AAPL: https://t.co/ggfmPjJjkW"
+B4 CRASH 2008 - Paulson's speech:" OUR FINANCIAL SYSTEM IS STRONG" $VXX $VIX $TVIX $UVXY $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+$ONCI is ready to RUN this week! #stockmarket #pennystocks #parabolic $CDNL $MGT $GOOGL $AAPL $TSLA $TWTR $ONCI https://t.co/wwqf0RNOix
+Apple could sell its billionth iPhone in just a few months' time https://t.co/u2qFZ440dH $AAPL https://t.co/8cAchiZ0vC
+The iPhone might radically change in 2017 $AAPL https://t.co/IXLdCfEdus https://t.co/GpdMvFZPjE
+"The growth of smartphones. On one graph.
+
+A great share via: https://t.co/2hAJlarjSM
+
+$AAPL $GOOGL $MSFT https://t.co/BAwQRvYzou"
+"$AAPL finished last quarter with $232 billion in cash, meanwhile Kanye running up debts making records for Tidal. 
+
+Bro."
+Which is bullish for $AAPL if you know anything about $GS https://t.co/WWssD01joh  https://t.co/CQk8iKMI7w
+"The tech stocks with the MOST revenue
+
+1. $AAPL
+2. $AMZN
+3. $MSFT
+
+Visual by @OphirGottlieb: https://t.co/GpZ5ct2z5r https://t.co/H6sNKdtBHd"
+