You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/25 16:43:21 UTC
[5/6] apex-malhar git commit: APEXMALHAR-2142 #comment Implement
WindowedStream interface
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
index 791ce3e..eae8b15 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
@@ -27,8 +27,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.stream.api.Option;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
@@ -36,14 +41,15 @@ import com.datatorrent.api.Operator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
/**
- * Graph data structure for DAG
- * With this data structure, the framework can do lazy load and optimization
+ * Logical graph data structure for DAG <br>
+ *
+ * With the build method({@link #buildDAG()}, {@link #buildDAG(DAG)}) to convert it to Apex DAG
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class DagMeta
{
-
private List<NodeMeta> heads = new LinkedList<>();
List<Pair<Attribute, Object>> dagAttributes = new LinkedList<>();
@@ -51,10 +57,10 @@ public class DagMeta
public static class NodeMeta
{
- private String nodeName;
-
private Operator operator;
+ private Option[] options;
+
List<Pair<Attribute, Object>> operatorAttributes = new LinkedList<>();
private Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> nodeStreams = new HashMap<>();
@@ -79,11 +85,6 @@ public class DagMeta
return children;
}
- public String getNodeName()
- {
- return nodeName;
- }
-
public Operator getOperator()
{
return operator;
@@ -94,13 +95,13 @@ public class DagMeta
return nodeStreams;
}
- public NodeMeta(Operator operator, String nodeName)
+ public NodeMeta(Operator operator, Option... options)
{
- this.nodeName = nodeName;
-
this.operator = operator;
+ this.options = options;
+
for (Field field : this.operator.getClass().getFields()) {
int modifier = field.getModifiers();
if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
@@ -122,6 +123,15 @@ public class DagMeta
}
}
+ public String getOperatorName()
+ {
+ for (Option opt : options) {
+ if (opt instanceof Option.OpName) {
+ return ((Option.OpName)opt).getName();
+ }
+ }
+ return operator.toString();
+ }
}
public DagMeta()
@@ -141,11 +151,15 @@ public class DagMeta
for (NodeMeta nm : heads) {
visitNode(nm, dag);
}
+ logger.debug("Finish building the dag:\n {}", dag.toString());
}
private void visitNode(NodeMeta nm, DAG dag)
{
- dag.addOperator(nm.nodeName, nm.operator);
+ String opName = nm.getOperatorName();
+ logger.debug("Building DAG: add operator {}: {}", opName, nm.operator);
+ dag.addOperator(opName, nm.operator);
+
for (NodeMeta child : nm.children) {
visitNode(child, dag);
}
@@ -154,15 +168,18 @@ public class DagMeta
if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) {
continue;
}
+ logger.debug("Building DAG: add stream {} from {} to {}", entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(),
entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
// set locality
if (entry.getValue().getRight() != null) {
+ logger.debug("Building DAG: set locality of the stream {} to {}", entry.getKey().toString(), entry.getValue().getRight());
streamMeta.setLocality(entry.getValue().getRight());
}
//set attributes for output port
if (nm.outputPortAttributes.containsKey(entry.getKey())) {
for (Pair<Attribute, Object> attr : nm.outputPortAttributes.get(entry.getKey())) {
+ logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), entry.getKey());
dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue());
}
}
@@ -173,6 +190,7 @@ public class DagMeta
//set input port attributes
if (nm.inputPortAttributes.containsKey(input)) {
for (Pair<Attribute, Object> attr : nm.inputPortAttributes.get(input)) {
+ logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), input);
dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue());
}
}
@@ -180,15 +198,16 @@ public class DagMeta
// set operator attributes
for (Pair<Attribute, Object> attr : nm.operatorAttributes) {
+ logger.debug("Building DAG: set operator attribute {} to {} for operator {}", attr.getLeft(), attr.getValue(), nm.operator);
dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue());
}
}
- public NodeMeta addNode(String nodeName, Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort)
+ public NodeMeta addNode(Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort, Option... options)
{
- NodeMeta newNode = new NodeMeta(operator, nodeName);
+ NodeMeta newNode = new NodeMeta(operator, options);
if (parent == null) {
heads.add(newNode);
} else {
@@ -199,4 +218,6 @@ public class DagMeta
return newNode;
}
+ private static final Logger logger = LoggerFactory.getLogger(DagMeta.class);
+
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
index 982980c..b5bc286 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
@@ -20,6 +20,8 @@ package org.apache.apex.malhar.stream.api.impl;
import java.util.UUID;
+import org.apache.hadoop.classification.InterfaceStability;
+
import com.datatorrent.api.Operator;
import static java.lang.System.currentTimeMillis;
@@ -29,6 +31,7 @@ import static java.lang.System.currentTimeMillis;
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class IDGenerator
{
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
index 7af6ece..d6201ad 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
@@ -18,39 +18,59 @@
*/
package org.apache.apex.malhar.stream.api.impl;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.PartitionStrategy;
import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.Option;
+import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
/**
- * A Factory class to build from different kind of input source
+ * A Factory class to build stream from different input sources
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class StreamFactory
{
- public static ApexStream<String> fromFolder(String inputOperatorName, String folderName)
+ /**
+ * Create a stream of string tuples from reading files in hdfs folder line by line
+ * @param folderName
+ * @param opts
+ * @return
+ */
+ public static ApexStream<String> fromFolder(String folderName, Option... opts)
{
LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator();
fileLineInputOperator.setDirectory(folderName);
ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
- return newStream.addOperator(inputOperatorName, fileLineInputOperator, null, fileLineInputOperator.output);
+ return newStream.addOperator(fileLineInputOperator, null, fileLineInputOperator.output, opts);
}
public static ApexStream<String> fromFolder(String folderName)
{
- return fromFolder("FolderScanner", folderName);
+ return fromFolder(folderName, name("FolderScanner"));
}
public static ApexStream<String> fromKafka08(String zookeepers, String topic)
{
- return fromKafka08("Kafka08Input", zookeepers, topic);
+ return fromKafka08(zookeepers, topic, name("Kafka08Input"));
}
- public static ApexStream<String> fromKafka08(String inputName, String zookeepers, String topic)
+ /**
+ * Create a stream of string reading input from kafka 0.8
+ * @param zookeepers
+ * @param topic
+ * @param opts
+ * @return
+ */
+ public static ApexStream<String> fromKafka08(String zookeepers, String topic, Option... opts)
{
KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic);
@@ -59,25 +79,49 @@ public class StreamFactory
return newStream.addOperator(kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort);
}
- public static <T> ApexStream<T> fromInput(String inputOperatorName, InputOperator operator, Operator.OutputPort<T> outputPort)
+ /**
+ * Create a stream with any input operator
+ * @param operator
+ * @param outputPort
+ * @param opts
+ * @param <T>
+ * @return
+ */
+ public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort, Option... opts)
{
ApexStreamImpl<T> newStream = new ApexStreamImpl<>();
- return newStream.addOperator(inputOperatorName, operator, null, outputPort);
+ return newStream.addOperator(operator, null, outputPort, opts);
}
- public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort)
+ /**
+ * Create stream of byte array messages from kafka 0.9
+ * @param brokers
+ * @param topic
+ * @param opts
+ * @return
+ */
+ public static ApexStream<byte[]> fromKafka09(String brokers, String topic, Option... opts)
{
- return fromInput(operator.toString(), operator, outputPort);
+ KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+ kafkaInput.setClusters(brokers);
+ kafkaInput.setTopics(topic);
+ ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+ return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts);
}
- public static ApexStream<String> fromKafka09(String name, String brokers, String topic)
+ /**
+ * Create stream of byte array messages from kafka 0.9 with more partition options
+ */
+ public static ApexStream<byte[]> fromKafka09(String brokers, String topic, PartitionStrategy strategy, int partitionNumber, Option... opts)
{
- throw new UnsupportedOperationException();
+ KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+ kafkaInput.setClusters(brokers);
+ kafkaInput.setTopics(topic);
+ kafkaInput.setStrategy(strategy.name());
+ kafkaInput.setInitialPartitionCount(partitionNumber);
+ ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+ return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts);
}
- public static ApexStream<String> fromKafka09(String brokers, String topic)
- {
- return fromKafka09("KafkaInput", brokers, topic);
- }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
new file mode 100644
index 0000000..1bc500d
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+/**
+ * A wrapper operator that intercept the tuples and convert them between {@link Tuple}
+ */
+public class TupleWrapperOperator implements InputOperator, Operator.CheckpointNotificationListener
+{
+
+ public static class OutputPortWrapper extends DefaultOutputPort implements Sink
+ {
+
+ @Override
+ public void put(Object o)
+ {
+ emit(o);
+ }
+
+ @Override
+ public int getCount(boolean b)
+ {
+ // No Accumulation
+ return 0;
+ }
+ }
+
+ public static class InputPortWrapper extends DefaultInputPort<Tuple>
+ {
+
+ @NotNull
+ private DefaultInputPort input;
+
+ public void setInput(DefaultInputPort input)
+ {
+ this.input = input;
+ }
+
+ @Override
+ public void process(Tuple o)
+ {
+ input.process(o.getValue());
+ }
+
+ @Override
+ public Sink getSink()
+ {
+ return input.getSink();
+ }
+
+ @Override
+ public void setConnected(boolean connected)
+ {
+ input.setConnected(connected);
+ }
+
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ input.setup(context);
+ }
+
+ @Override
+ public void teardown()
+ {
+ input.teardown();
+ }
+ }
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient OutputPortWrapper output1 = new OutputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient OutputPortWrapper output2 = new OutputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient OutputPortWrapper output3 = new OutputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient OutputPortWrapper output4 = new OutputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient OutputPortWrapper output5 = new OutputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient InputPortWrapper input1 = new InputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient InputPortWrapper input2 = new InputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient InputPortWrapper input3 = new InputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient InputPortWrapper input4 = new InputPortWrapper();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient InputPortWrapper input5 = new InputPortWrapper();
+
+ //delegate to
+ @NotNull
+ private Operator operator;
+
+ public void setOperator(Operator operator)
+ {
+ this.operator = operator;
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+ operator.beginWindow(l);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ operator.endWindow();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ operator.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ operator.teardown();
+ }
+
+ @Override
+ public void beforeCheckpoint(long l)
+ {
+ if (operator instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)operator).beforeCheckpoint(l);
+ }
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ if (operator instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)operator).checkpointed(l);
+ }
+ }
+
+ @Override
+ public void committed(long l)
+ {
+ if (operator instanceof CheckpointNotificationListener) {
+ ((CheckpointNotificationListener)operator).committed(l);
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (operator instanceof InputOperator) {
+ ((InputOperator)operator).emitTuples();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
new file mode 100644
index 0000000..68f1b9e
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.commons.lang3.mutable.MutableLong;
+
+/**
+ * Count Accumulation
+ */
+public class Count implements Accumulation<Long, MutableLong, Long>
+{
+
+ @Override
+ public MutableLong defaultAccumulatedValue()
+ {
+ return new MutableLong(0);
+ }
+
+ @Override
+ public MutableLong accumulate(MutableLong accumulatedValue, Long input)
+ {
+ accumulatedValue.add(input);
+ return accumulatedValue;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2)
+ {
+ accumulatedValue1.add(accumulatedValue2);
+ return accumulatedValue1;
+ }
+
+ @Override
+ public Long getOutput(MutableLong accumulatedValue)
+ {
+ return accumulatedValue.getValue();
+ }
+
+ @Override
+ public Long getRetraction(Long value)
+ {
+ return -value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
new file mode 100644
index 0000000..3ab6892
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * Fold Accumulation Adaptor class
+ */
+public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT>
+{
+
+ public FoldFn()
+ {
+ }
+
+ public FoldFn(OUTPUT initialVal)
+ {
+ this.initialVal = initialVal;
+ }
+
+ private OUTPUT initialVal;
+
+ @Override
+ public OUTPUT defaultAccumulatedValue()
+ {
+ return initialVal;
+ }
+
+ @Override
+ public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input)
+ {
+ return fold(accumulatedValue, input);
+ }
+
+ @Override
+ public OUTPUT getOutput(OUTPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public OUTPUT getRetraction(OUTPUT value)
+ {
+ return null;
+ }
+
+ abstract OUTPUT fold(OUTPUT result, INPUT input);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
new file mode 100644
index 0000000..b4507bc
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * An easy to use reduce Accumulation
+ * @param <INPUT>
+ */
+public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT>
+{
+ @Override
+ public INPUT defaultAccumulatedValue()
+ {
+ return null;
+ }
+
+ @Override
+ public INPUT accumulate(INPUT accumulatedValue, INPUT input)
+ {
+ if (accumulatedValue == null) {
+ return input;
+ }
+ return reduce(accumulatedValue, input);
+ }
+
+ @Override
+ public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2)
+ {
+ return reduce(accumulatedValue1, accumulatedValue2);
+ }
+
+ @Override
+ public INPUT getOutput(INPUT accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public INPUT getRetraction(INPUT value)
+ {
+ return null;
+ }
+
+ public abstract INPUT reduce(INPUT input1, INPUT input2);
+
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
new file mode 100644
index 0000000..77a08a6
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl.accumulation;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+
+/**
+ * TopN accumulation
+ */
+public class TopN<T> implements Accumulation<T, List<T>, List<T>>
+{
+
+ int n;
+
+ Comparator<T> comparator;
+
+ public void setN(int n)
+ {
+ this.n = n;
+ }
+
+ public void setComparator(Comparator<T> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public List<T> defaultAccumulatedValue()
+ {
+ return new LinkedList<>();
+ }
+
+ @Override
+ public List<T> accumulate(List<T> accumulatedValue, T input)
+ {
+ int k = 0;
+ for (T inMemory : accumulatedValue) {
+ if (comparator != null) {
+ if (comparator.compare(inMemory, input) < 0) {
+ break;
+ }
+ } else if (input instanceof Comparable) {
+ if (((Comparable<T>)input).compareTo(inMemory) > 0) {
+ break;
+ }
+ } else {
+ throw new RuntimeException("Tuple cannot be compared");
+ }
+ k++;
+ }
+ accumulatedValue.add(k, input);
+ if (accumulatedValue.size() > n) {
+ accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1));
+ }
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2)
+ {
+ accumulatedValue1.addAll(accumulatedValue2);
+ if (comparator != null) {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator));
+ } else {
+ Collections.sort(accumulatedValue1, Collections.reverseOrder());
+ }
+ if (accumulatedValue1.size() > n) {
+ return accumulatedValue1.subList(0, n);
+ } else {
+ return accumulatedValue1;
+ }
+ }
+
+ @Override
+ public List<T> getOutput(List<T> accumulatedValue)
+ {
+ return accumulatedValue;
+ }
+
+ @Override
+ public List<T> getRetraction(List<T> accumulatedValue)
+ {
+ return new LinkedList<>();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
index cc85f37..b0fe3c5 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
@@ -18,6 +18,8 @@
*/
package org.apache.apex.malhar.stream.api.operator;
+import org.apache.hadoop.classification.InterfaceStability;
+
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
@@ -33,6 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class AnnonymousClassModifier extends ClassVisitor
{
private String className;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
index 9194dc2..05a791c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
@@ -21,9 +21,12 @@ package org.apache.apex.malhar.stream.api.operator;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceStability;
+
/**
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class ByteArrayClassLoader extends ClassLoader
{
private final Map<String, byte[]> classes;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
index 0a8ba55..1e2066c 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
@@ -26,10 +26,11 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import javax.validation.constraints.NotNull;
-
+import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.util.TupleUtil;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
@@ -39,12 +40,15 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
/**
* Operators that wrap the functions
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
{
private byte[] annonymousFunctionClass;
@@ -57,8 +61,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
protected boolean isAnnonymous = false;
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>();
+
public FunctionOperator(FUNCTION f)
{
isAnnonymous = f.getClass().isAnonymousClass();
@@ -245,6 +253,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
}
+ @InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
{
@Override
@@ -255,6 +264,21 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
}
};
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>()
+ {
+ @Override
+ public void process(Tuple<IN> t)
+ {
+ Function.MapFunction<IN, OUT> f = getFunction();
+ if (t instanceof Tuple.PlainTuple) {
+ TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue()));
+ } else {
+ output.emit(f.f(t.getValue()));
+ }
+ }
+ };
+
public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
{
super(f);
@@ -269,6 +293,8 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
}
+
+ @InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
{
@Override
@@ -281,93 +307,65 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
}
};
- public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
- {
- super(f);
- }
- }
-
- public static class FoldFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FoldFunction<IN, OUT>>
- {
-
- public FoldFunctionOperator()
- {
-
- }
-
- @NotNull
- private OUT foldVal;
-
- public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>()
{
@Override
- public void process(IN t)
+ public void process(Tuple<IN> t)
{
- Function.FoldFunction<IN, OUT> f = getFunction();
- // fold the value
- foldVal = f.fold(t, foldVal);
- output.emit(foldVal);
+ Function.FlatMapFunction<IN, OUT> f = getFunction();
+ if (t instanceof Tuple.PlainTuple) {
+ for (OUT out : f.f(t.getValue())) {
+ tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out));
+ }
+ } else {
+ for (OUT out : f.f(t.getValue())) {
+ output.emit(out);
+ }
+ }
}
};
- public FoldFunctionOperator(Function.FoldFunction<IN, OUT> f, OUT initialVal)
+ public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
{
super(f);
- this.foldVal = initialVal;
}
}
- public static class ReduceFunctionOperator<IN> extends FunctionOperator<IN, Function.ReduceFunction<IN>>
+
+ public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
{
- public ReduceFunctionOperator()
+ public FilterFunctionOperator()
{
}
- @NotNull
- private IN reducedVal;
-
+ @InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
{
@Override
public void process(IN t)
{
- Function.ReduceFunction<IN> f = getFunction();
+ Function.FilterFunction<IN> f = getFunction();
// fold the value
- if (reducedVal == null) {
- reducedVal = t;
- return;
+ if (f.f(t)) {
+ output.emit(t);
}
- reducedVal = f.reduce(t, reducedVal);
- output.emit(reducedVal);
}
};
- public ReduceFunctionOperator(Function.ReduceFunction<IN> f)
- {
- super(f);
- }
- }
-
- public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
- {
-
- public FilterFunctionOperator()
- {
-
- }
-
- public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>()
{
@Override
- public void process(IN t)
+ public void process(Tuple<IN> t)
{
Function.FilterFunction<IN> f = getFunction();
- // fold the value
- if (f.f(t)) {
- output.emit(t);
+ if (f.f(t.getValue())) {
+ tupleOutput.emit(t);
}
+
}
};
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
deleted file mode 100644
index 3641189..0000000
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.api.util;
-
-/**
- * An interface indicate a tuple with a specific key
- * It is used internally to identify the key from the tuple
- *
- * @since 3.4.0
- */
-public interface KeyedTuple<K>
-{
- /**
- * Return the key of the tuple
- * @return
- */
- K getKey();
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
index 583615a..04f42b3 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -18,6 +18,11 @@
*/
package org.apache.apex.malhar.stream.api.util;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+
/**
* The tuple util will be used to extract fields that are used as key or value<br>
* Or converting from data tuples to display tuples <br>
@@ -29,8 +34,22 @@ package org.apache.apex.malhar.stream.api.util;
public class TupleUtil
{
- public static interface NONE
+ public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue)
{
+ if (t instanceof Tuple.WindowedTuple) {
+ Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>();
+ List<Window> wins = ((Tuple.WindowedTuple)t).getWindows();
+ for (Window w : wins) {
+ newT.addWindow(w);
+ }
+ newT.setValue(newValue);
+ ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp());
+ return newT;
+ } else if (t instanceof Tuple.TimestampedTuple) {
+ return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue);
+ } else {
+ return new Tuple.PlainTuple<>(newValue);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
index 34820b6..9d03f2a 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java
@@ -269,7 +269,7 @@ public class FunctionOperatorTest
= new FunctionOperator.FilterFunctionOperator<Integer>(new Function.FilterFunction<Integer>()
{
@Override
- public Boolean f(Integer in)
+ public boolean f(Integer in)
{
return in % divider == 0;
}
@@ -309,7 +309,7 @@ public class FunctionOperatorTest
.filter(new Function.FilterFunction<Integer>()
{
@Override
- public Boolean f(Integer in)
+ public boolean f(Integer in)
{
return in % divider == 0;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
index 71b9a82..99d5ca6 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -33,6 +33,8 @@ import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
/**
* Unit test to default implementation of ApexStream interface
*/
@@ -45,8 +47,8 @@ public class ApexStreamImplTest
LogicalPlan dag = new LogicalPlan();
TestOperator<String, Integer> firstOperator = new TestOperator<>();
TestOperator<Integer, Date> secondOperator = new TestOperator<>();
- new ApexStreamImpl<String>().addOperator("first", firstOperator, null, firstOperator.output)
- .addOperator("second", secondOperator, secondOperator.input, null)
+ new ApexStreamImpl<String>().addOperator(firstOperator, null, firstOperator.output, name("first"))
+ .endWith(secondOperator, secondOperator.input, name("second"))
.with(DAG.Locality.THREAD_LOCAL)
.with(Context.OperatorContext.AUTO_RECORD, true)
.with("prop", "TestProp").populateDag(dag);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
index 44f76b1..f65806e 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -20,6 +20,11 @@ package org.apache.apex.malhar.stream.sample;
import java.util.Arrays;
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
@@ -28,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
/**
* An application example with stream api
@@ -40,18 +46,27 @@ public class ApplicationWithStreamAPI implements StreamingApplication
public void populateDAG(DAG dag, Configuration configuration)
{
String localFolder = "./src/test/resources/data";
- ApexStream stream = StreamFactory
+ ApexStream<String> stream = StreamFactory
.fromFolder(localFolder)
.flatMap(new Function.FlatMapFunction<String, String>()
{
@Override
public Iterable<String> f(String input)
{
- return Arrays.asList(input.split(" "));
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
}
});
stream.print();
- stream.countByKey().print();
+ stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration
+ .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+ }
+ }).print();
stream.populateDag(dag);
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
index d679135..f46fb14 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
@@ -27,9 +27,13 @@ import java.util.concurrent.Callable;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import com.datatorrent.lib.util.KeyValPair;
/**
* A embedded application test without creating Streaming Application
@@ -41,25 +45,32 @@ public class LocalTestWithoutStreamApplication
public void testNonStreamApplicationWordcount() throws Exception
{
- TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+ TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
collector.id = "testNonStreamApplicationWordcount";
- final Map<Object, Integer> expected = new HashMap<>();
- expected.put("error", 2);
- expected.put("word1", 4);
- expected.put("word2", 8);
- expected.put("word3", 4);
- expected.put("word4", 4);
- expected.put("word5", 4);
- expected.put("word7", 4);
- expected.put("word9", 6);
+ final Map<String, Long> expected = new HashMap<>();
+ expected.put("error", 2L);
+ expected.put("word1", 4L);
+ expected.put("word2", 8L);
+ expected.put("word3", 4L);
+ expected.put("word4", 4L);
+ expected.put("word5", 4L);
+ expected.put("word7", 4L);
+ expected.put("word9", 6L);
Callable<Boolean> exitCondition = new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
- List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
- return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+ if (!TupleCollector.results.containsKey("testNonStreamApplicationWordcount") || TupleCollector.results.get("testNonStreamApplicationWordcount").isEmpty()) {
+ return false;
+ }
+ Map<String, Long> data = new HashMap<>();
+ for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry :
+ (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) {
+ data.put(entry.getValue().getKey(), entry.getValue().getValue());
+ }
+ return data.size() >= 8 && expected.equals(data);
}
};
@@ -73,13 +84,26 @@ public class LocalTestWithoutStreamApplication
return Arrays.asList(input.split(" "));
}
})
- .countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+ .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+ }
+ }).addOperator(collector, collector.inputPort, null).runEmbedded(false, 30000, exitCondition);
+ Map<String, Long> data = new HashMap<>();
- List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+ for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry :
+ (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) {
+ data.put(entry.getValue().getKey(), entry.getValue().getValue());
+ }
+ //Thread.sleep(100000);
Assert.assertNotNull(data);
Assert.assertTrue(data.size() > 1);
- Assert.assertEquals(expected, data.get(data.size() - 1));
+ Assert.assertEquals(expected, data);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
index 4958a8e..20d7aed 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.stream.sample;
-import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
@@ -30,7 +29,7 @@ import com.datatorrent.api.DAG;
public class MyStream<T> extends ApexStreamImpl<T>
{
- public MyStream(ApexStream<T> apexStream)
+ public MyStream(ApexStreamImpl<T> apexStream)
{
super(apexStream);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
index b2d1e8b..5e48974 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
@@ -24,37 +24,51 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import com.datatorrent.lib.util.KeyValPair;
+
/**
* A test class which test your own stream implementation build on default one
*/
@SuppressWarnings("unchecked")
public class MyStreamTest
{
- static Map<Object, Integer> expected = new HashMap<>();
+ static Map<String, Long> expected = new HashMap<>();
static String testId = null;
static Callable<Boolean> exitCondition = null;
static {
- expected.put("newword1", 4);
- expected.put("newword2", 8);
- expected.put("newword3", 4);
- expected.put("newword4", 4);
- expected.put("newword5", 4);
- expected.put("newword7", 4);
- expected.put("newword9", 6);
+ expected.put("newword1", 4L);
+ expected.put("newword2", 8L);
+ expected.put("newword3", 4L);
+ expected.put("newword4", 4L);
+ expected.put("newword5", 4L);
+ expected.put("newword7", 4L);
+ expected.put("newword9", 6L);
exitCondition = new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
- List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
- return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+ if (!TupleCollector.results.containsKey(testId) || TupleCollector.results.get(testId).isEmpty()) {
+ return false;
+ }
+ Map<String, Long> dataMap = new HashMap<>();
+ List<Tuple.TimestampedTuple<KeyValPair<String, Long>>> data = (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId);
+ for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : data) {
+ dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+ }
+ return (dataMap != null) && dataMap.size() >= 1 && expected.equals(dataMap);
}
};
}
@@ -65,9 +79,9 @@ public class MyStreamTest
testId = "testMethodChainWordcount";
- TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+ TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
collector.id = testId;
- new MyStream<>(StreamFactory.fromFolder("./src/test/resources/data"))
+ new MyStream<>((ApexStreamImpl<String>)StreamFactory.fromFolder("./src/test/resources/data"))
.<String, MyStream<String>>flatMap(new Function.FlatMapFunction<String, String>()
{
@Override
@@ -85,17 +99,28 @@ public class MyStreamTest
}, new Function.FilterFunction<String>()
{
@Override
- public Boolean f(String input)
+ public boolean f(String input)
{
return input.startsWith("word");
}
- }).countByKey()
- .addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+ }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000)))
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+ }
+ }).addOperator(collector, collector.inputPort, null)
+ .runEmbedded(false, 30000, exitCondition);
+ Map<String, Long> dataMap = new HashMap<>();
+ for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) {
+ dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+ }
- List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
- Assert.assertTrue(data.size() > 1);
- Assert.assertEquals(expected, data.get(data.size() - 1));
+ Assert.assertTrue(dataMap.size() > 1);
+ Assert.assertEquals(expected, dataMap);
}
@Test
@@ -103,9 +128,9 @@ public class MyStreamTest
{
testId = "testNonMethodChainWordcount";
- TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+ TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>();
collector.id = testId;
- MyStream<String> mystream = new MyStream<>(StreamFactory
+ MyStream<String> mystream = new MyStream<>((ApexStreamImpl<String>)StreamFactory
.fromFolder("./src/test/resources/data"))
.flatMap(new Function.FlatMapFunction<String, String>()
{
@@ -125,16 +150,28 @@ public class MyStreamTest
}, new Function.FilterFunction<String>()
{
@Override
- public Boolean f(String input)
+ public boolean f(String input)
{
return input.startsWith("word");
}
- }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+ }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000)))
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+ }
+ }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
+ Map<String, Long> dataMap = new HashMap<>();
+ for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) {
+ dataMap.put(entry.getValue().getKey(), entry.getValue().getValue());
+ }
- List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId);
- Assert.assertTrue(data.size() > 1);
- Assert.assertEquals(expected, data.get(data.size() - 1));
+ Assert.assertTrue(dataMap.size() > 1);
+ Assert.assertEquals(expected, dataMap);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
index a5c644d..94667c9 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java
@@ -25,6 +25,7 @@ import java.util.Map;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
/**
@@ -37,6 +38,7 @@ public class TupleCollector<T> extends BaseOperator
public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>(this);
+ @OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();
public String id = "";
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
new file mode 100644
index 0000000..14ff066
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class WCInput extends BaseOperator implements InputOperator
+{
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+
+ private transient BufferedReader reader;
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ initReader();
+ }
+
+ private void initReader()
+ {
+ try {
+ Path myPath = new Path("/user/siyuan/wc/wordcount");
+ FileSystem fs = FileSystem.get(new Configuration());
+ reader = new BufferedReader(new InputStreamReader(fs.open(myPath)));
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ IOUtils.closeQuietly(reader);
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ try {
+ String line = reader.readLine();
+ if (line == null) {
+ reader.close();
+ initReader();
+ } else {
+ // simulate late data
+ //long timestamp = System.currentTimeMillis() - (long)(Math.random() * 30000);
+
+ this.output.emit(line);
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ //this.controlOutput.emit(new WatermarkImpl(System.currentTimeMillis() - 15000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
new file mode 100644
index 0000000..11dabe4
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+
+import org.joda.time.Duration;
+
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Word count with streaming API
+ */
+@ApplicationAnnotation(name = "WCDemo")
+public class WordCountWithStreamAPI implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ WCInput wcInput = new WCInput();
+ ApexStream<String> stream = StreamFactory
+ .fromInput(wcInput, wcInput.output)
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
+ }
+ });
+ stream.print();
+ stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration
+ .millis(1000)).accumulatingFiredPanes())
+ .countByKey(new Function.ToKeyValue<String, String, Long>()
+ {
+ @Override
+ public Tuple<KeyValPair<String, Long>> f(String input)
+ {
+ return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
+ }
+ }).print();
+ stream.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/resources/sampletweets.txt
----------------------------------------------------------------------
diff --git a/stream/src/test/resources/sampletweets.txt b/stream/src/test/resources/sampletweets.txt
new file mode 100644
index 0000000..379d424
--- /dev/null
+++ b/stream/src/test/resources/sampletweets.txt
@@ -0,0 +1,207 @@
+Tweet content
+"Apple has $233 billion in cash. It could buy all
+
+\u2014@NFL teams
+\u2014@NBA teams
+\u2014@MLB teams
+\u2014@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Read $MRNJ #NEWS, $HEMP & $GRCU r above .01, that's where we r goin\U0001f680 $SPY $MSFT $SBUX $SFOR $VRX $AAPL $TSLA $GOOG $FB $EURUSD $USDJPY $MLCG
+$RXSF is leadin their sector accordin 2 @EdisonMediaCen $AAPL $SPY $TSLA $FB $EURUSD $ALK $IBB $EW $AMZN $GBPUSD $GM https://t.co/LYY2mHn755
+Philstockworld Top Trade Review $AAPL $MSFT #Dividends $USO $HOV $TWTR -- https://t.co/JArXsIm7CI https://t.co/kRR9ezhm9E
+Philstockworld Top Trade Review: $AAPL $ABX $BA $CAKE $CMG $DIS $IBM $GILD $LL $UNG $SPY -- https://t.co/EX5SYjdwBC https://t.co/7FBZwVZ63v
+"Monday\u2019s Oil Mess: Rent-A-Rebel Jacks up Prices into the Holiday $USO $AAPL
+#Earnings -- https://t.co/cGHB3WDKA8 https://t.co/JFZIBcom1n"
+Meaningless Monday Market Movement! $AAPL $SQQQ #oil #Brexit https://t.co/j4Iqg7E1HN
+"S&P Futures Back over 2,050, for Now
+$SPY $AAPL $SQQQ #China #Debt #Hedging -- https://t.co/2dOc5T89S3 https://t.co/TDPVdNRNQF"
+"\U0001f4a5TURN YOUR $500 INTO $5,000+\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE\u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $GSAT $MGT
+https://t.co/lwAGjfmIP3"
+Trendless Tuesday - Watch Yesterday\u2019s Fake Gains Disappear $AAPL #China $FXI #Earnings -- https://t.co/GpgGqoOlFn https://t.co/FRuixv5aZF
+"\U0001f4a5TURN YOUR $500 INTO $5,000+\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE\u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $AAPL $UVXY $JDST
+https://t.co/lwAGjfmIP3"
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla
+Twitter
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+Option Opportunity Portfolio May Review \u2013 Up 19.3% In 30 Days! $ABX $FCX $USO $AAPL $DIS - https://t.co/rp3kMsRZ3E https://t.co/TKkc15pKcR
+Waiting for the Fed \u2013 Apple Gives Us Huge Wins: $AAPL $SQQQ #GDP #Nikkei #Futures #Oil -- https://t.co/Al3pkf350V https://t.co/LktIRF4F2b
+Tempting Tuesday - S&P 2,100 is Still the Line to Watch Ahead of the Fed $AAPL $QQQ -- https://t.co/t1eDfKHJnk https://t.co/BAW3RAe7SC
+Our $SQQQ Hedge is Up 314% and Our Futures Are Up $4,850, You're Welcome! $AAPL -- https://t.co/eUQ2kCkCOY https://t.co/Yk98oyqMZl
+"TURN YOUR \U0001f4b2500 INTO \U0001f4b25,000$\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE \u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TWTR $AAPL $LNKD
+https://t.co/euJFNQX1g4"
+"TURN YOUR \U0001f4b2500 INTO \U0001f4b25,000$\U0001f4a5
+
+JOIN #TEAMBILLIONAIRE \u2935
+\U0001f4e7 pennystockhotline@gmail.com
+
+#PENNYSTOCKS $TALK $PPPI $AAPL https://t.co/oSn11kxftM"
+Bears today. We getting paid! $AAPL $TWTR $BWLD $NFLX https://t.co/CCi0S3skJJ
+"Apple has $233 billion in cash. It could buy all
+
+\u2014@NFL teams
+\u2014@NBA teams
+\u2014@MLB teams
+\u2014@NHL teams
+
+...and still have $80 billion left. $AAPL"
+Are you in Sync with the market? https://t.co/ZtHHCrSAf8 #stocks #finance #investing #trading $AAPL $LNKD $NFLX $GOOGL $FB
+The Last Time These Insiders Purchased This Stock It Sky Rocketed 1000%+ https://t.co/bmNAHBoQBD $DIA $QQQ $SPY $GOOG $AAPL $BAC $TWTR $FB
+"This Hacker Made Amazon\u2019s Alexa, Google Now, and Siri Command One Another
+https://t.co/YXP3yqmf4H $AAPL $AMZN $GOOG https://t.co/NG7r6qgfRt"
+"Over the last 3 years, the top 14 automakers upped their combined R&D spend by $192 million.
+
+$AAPL upped R&D spend by $5 billion.
+
+- MS"
+Volatility can be your friend. https://t.co/aHz2r8HHD2 #stocks #trading #investing #financials #learntodaytrade $FB $AAPL $LNKD $NFLX
+"PERCENTAGE of Apple's Revenues:
+FY 2006:
+iPod 40%
+Mac 38%
+Services 10%
+Others 12%
+
+FY 2015:
+iPhone 66%
+Mac 11%
+iPad 10%
+Others 13%
+
+$AAPL"
+Apple recovered $40 million worth of gold from recycled iPhones, iPads & Macs in 2015. https://t.co/XPBWlM6cBs $AAPL https://t.co/P0LMSRw7Ot
+"Apple's iPhone sales sink for 1st time ever last quarter
+https://t.co/TAKjUwl4Yc @DavidGoldmanCNN @cnntech $AAPL https://t.co/OrDp4BDpsD"
+$BAC is down 5% since our article was posted on Friday https://t.co/al8AgaSsiI $DIA $QQQ $SPY $AAPL $GOOG $FB $TWTR $STUDY $NFLX $LNKD $IBM
+Ben Franklin: The First Proponent Of Dividend Growth Investing? https://t.co/dx7FE2G9AH $AAPL $ACN $AL $BEN $CSV $HON $IJR $JNJ $JWN $PEGI
+$5,000 Friday the 13th - Yesterday's Futures Trades Pay Off Nicely $USO $SPY $AAPL -- https://t.co/3RUEjAq1bO https://t.co/2L7cdebTlT
+I DON'T SEE ANY BUBBLE RIGHT NOW , I SWEAR ! $SPX $SPY $DIA $DJI $AAPL $VIX $TVIX $C $BAC $GM $GE $FB #STOCKMARKET https://t.co/E5954RIpC7
+Terrible $AAPL quarter, finally. On the way to becoming $NOK. Tech is mean reverting, today's leaders are almost always tomorrow's laggards.
+The iPhone 7S could look radically different from the iPhones of today https://t.co/eQxUMAZ4eM $AAPL https://t.co/HIH3QqKpIC
+"No Bull: The Evidence
+https://t.co/Md2SNpjdwd
+$SPX $MSFT $GOOGL $AAPL $NFLX $AMZN $FB $DIS $V $BAC $GS $WMT $SBUX https://t.co/1oISHNX4cJ"
+The iPhone 7S could look radically different from the iPhones of today https://t.co/KgeVSjmcGe $AAPL https://t.co/7hFtg37oJu
+There was a 3rd Apple founder, Ronald Wayne, who sold his 10% stake for $800 in 1976. Today his share would've been worth $65 Billion. $AAPL
+Twitter Stock Set to Breakout Soon https://t.co/u4V6ChhpOW $TWTR $DIA $QQQ $SPY $AAPL $GLD $GDX $NUGT $DUST $BAC $GOOG $FB $STUDY $NFLX $IBM
+Alibaba Stock Price Breaks The 50 Day Moving Average https://t.co/ABOVWI6j2G $BABA $AAPL $YHOO $COST $UWTI $CSC $MON https://t.co/VlWGDxrQXh
+I still can\u2019t shake the feeling that $AAPL is slowly taking themselves private. https://t.co/XIAMvppDWh https://t.co/kdMGCGbMaJ
+$SPX ROADMAP 2016 #STOCKMARKET $INTC $F $SPY $AAPL $AMZN $C $VIX $FB $TWTR $GOOGL $UVXY $FAZ $FEZ $MSFT $GS $BAC $AA https://t.co/owuQ9awcDw
+"Want to know why $GOOG is so impressive and why $AAPL is so fucked? Read this years founders' letter from $GOOG:
+
+https://t.co/LiBjGZwyKw"
+"GET READY. Here are the companies reporting earnings next week: https://t.co/NXptPkQX70
+
+$AAPL $FB $TWTR $CMG $GILD https://t.co/tcIoCZdOZi"
+$SPX THIS TIME IT'S DIFFERENT! $SPY $DIA $SDOW $S $FAZ $FEZ $AAPL $MSFT $BAC $C $JPM $GS $SIRI $AMZN $F $VIX $TVIX https://t.co/pkYVgNKv3P
+$SPX ROADMAP 2016 #STOCKS $TVIX $VXX $VALE $AAPL $AKS $FCX $MSFT $AA $MU $VIX $SPX $SPY #TRADING $PCLN $SIRI $ MCD https://t.co/6UH5He38h1
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/s8iQOvPQeR $AAPL https://t.co/QmQROtQ9vY
+11/ For example, buy an Echo and see your behavior change. The future is happening, and $AAPL seems, to me, asleep.
+$RLYP $SPY $KORS $WDAY $MSFT $AAPL $QLIK $TIVO $NXPI $CPXX $AVGO $ZOES $LE $TICC $SLB $FCEL $VRA $MLNX $ASNA $ICPT https://t.co/LXMpz4rFG0
+#STOCKMARKET GRAVITY LESSONS: what goes up must come down $SPX $SPY $DIA $QQQ $TVIX $VIX $AAPL $C $FB $PCLN $BAC $F https://t.co/8HQHBEgSj5
+Should Icahn's exit or Buffett's entry affect your $AAPL judgment? The Big Name Effect. https://t.co/9Z2ok61MUh https://t.co/udAQLfdJFe
+Apple revenue drops 13 percent, ending 13 years of growth. Greater China was especially weak, down 26 percent. $AAPL https://t.co/q4ovXUenBU
+It was a $18 billion day for Apple. https://t.co/iRbGeoTmCJ $AAPL
+"Apple has $233 billion in cash. It could buy:
+
+Uber
+Tesla
+Twitter
+Airbnb
+Netflix
+Yahoo
+
+...and still have $18 billion left. $AAPL"
+#3 TOP 2111.05 #STOCKS #STOCKMARKET #TRADING $SPX $SPY $VIX $TVIX $AAPL $SIRI $C $BAC $JPM $AMZN $MSFT $FB $TWTR $F https://t.co/gSqmN0fVON
+Google #IO16: Android's failure to innovate hands a Apple free run at WWDC $GOOG $AAPL https://t.co/FTs9M8JD5g https://t.co/20uou1gUkW
+$SPX 2134.72..2116.48...2111.05 HOUSTON WE HAVE A PROBLEM ! #STOCKMARKET $VIX $SPY $DIA $AAPL $C $BAC $FB $VXX $MSFT https://t.co/du3QfPUM4Q
+top #earnings $FB $AAPL $AMZN $TWTR $CMG $F $GILD $LNKD $FCX $CELG $SWKS $JBLU $T $NXPI $BA https://t.co/lObOE0uRjZ https://t.co/94F6GJc3hE
+The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/ZVeQ9a4Yrh $AAPL https://t.co/2Ntpbxwlyo
+You do not want to miss this incredibly candid look into $AAPL w/ @tim_cook! Tune into @MadMoneyOnCNBC on @CNBC now! https://t.co/budv4qfvju
+Foxconn axes 60,000 jobs in one Chinese factory as robots take over: https://t.co/BnFdjGCmLf $AAPL https://t.co/WhRHer8jdN
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/nXmvK6PV7M https://t.co/MAcMz0iTg6
+Apple is about to report its worst quarter in 13 years on Tuesday https://t.co/NJ3hwunHCx $AAPL https://t.co/YLTmnpqNjI
+Everyone who wants an iPhone has one. $AAPL is now a consumer staple stock and will trade on replacement / shareholder yield.
+Financial Armageddon\u2019 is imminent, the next major crash will happen in 2016 $VXX $VIX $TVIX $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+"Apple is NO longer the largest US stock by market cap. Google is: https://t.co/i81Y83jQJC
+
+$GOOGL $AAPL https://t.co/cRCKRYBICS"
+Exclusive: Apple hires former Tesla VP Chris Porritt for \u2018special [car] project\u2019 https://t.co/7knsloxvJW $TSLA $AAPL https://t.co/X8cYztExoP
+$SPX on the top of downtrend Channel Be careful! #STOCKMARKET $SPY $AAPL $AMZN $TSLA $FB $QQQ $DIA $NFLX $PCLN $C $F https://t.co/UKZCyLYuBq
+UPDATE: Apple CEO Cook says in conference call that smartphone marker is 'currently not growing' $AAPL https://t.co/WeECmrdv1j
+In February Charlie Munger was asked why Berkshire owns $GM. The $AAPL stake isn't anymore complicated than this: https://t.co/Rwkb30OEgq
+Talking to @SquawkStreet about $AAPL & more at @NYSE ! https://t.co/m05b68VLMp
+iPhone sales sour #Apple's earning: https://t.co/962fj9SWsc $AAPL https://t.co/nz9FRK6sNK
+People aren\u2019t upgrading smartphones as quickly and that is bad for Apple https://t.co/EOEJPfNR8Z \U0001f513 $AAPL
+"$NXPI $JBLU $FCX $AAPL $CMG
+$TWTR $EBAY $BWLD $PNRA $CRUS
+$FB $FSLR $UPS $CELG $AMZN
+$LNKD $BIDU $SWKS $GILD $HELE https://t.co/rQUmhHgYn0"
+People mad that Icahn sold $AAPL without giving them the head\u2019s up - How much in commissions did you pay him this year?
+Cool stat: $AAPL's $46 billion loss in market cap overnight is greater than the market cap of 391 S&P 500 companies https://t.co/1ms1YZzTbP
+Apple. You've come a long way... https://t.co/WGvk8K8MYv $AAPL https://t.co/3Wo0hAwRAc
+"Someone is building the Internet's biggest list of stock market cliches and it's amazing: https://t.co/mIV169cF36
+
+$SPY $AAPL $EURUSD"
+JUST IN: Apple delays earnings release by one day, to April 26th after the bell. \u2022 $AAPL
+Apple's market value is down nearly three Twitters $AAPL $TWTR
+Trump warns of a tech bubble: https://t.co/6Ks1yTa4Zc $AAPL $FB $AMZN He's 100% right about this. https://t.co/dJgTLk5JOB
+Apple could sell its billionth iPhone in just a few months' time https://t.co/g6VYDFIE3d $AAPL https://t.co/jzucmxDYXe
+$SPX KEEP BLOWING #STOCKMARKET #BUBBLE #STOCKS $MSFT $GS $AAPL $SPY $DIA $DJI $C $SIRI $PCLN $BAC $JPM $VIX $TVIX https://t.co/GPFBb0uCLF
+Will Apple $AAPL fall from tree? 12-mo descending triangle. I've no interest to short it, but it will be wild ride https://t.co/AnjsIKmIHI
+Tim Cook shouldn't be doing TV w/out a new product. Looks desperate. Not a consumer-facing guy. $AAPL https://t.co/Z4UFSimTLg
+When will Apple will sell its billionth iPhone? It may be sooner than you think: https://t.co/5IaF018N1p $AAPL https://t.co/cCIgtKqWHA
+#Stockmarket downtrend continues next week $spx $spy $vix $tvix $dji $aapl $jpm $bac $c $msft $pcln $wmt $ba https://t.co/1TTlgnKnZc
+$AAPL https://t.co/AFANPYHnoq
+40 years ago this month, Apple co-founder Ronald Wayne sold his 10% stake in $AAPL for $800. Current value: $61 billion.
+Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/rXWwuyIooI https://t.co/TztgKCcWWy
+Apple's iBooks and iTunes Movies in China have been shut down after less than 7 months https://t.co/ZuGXZqSHma $AAPL https://t.co/1OHGC9YiUf
+Possible buy on $AAPL as it drops onto it's 9 DEMA support #1Broker #Bitcoin #Blockchain https://t.co/WWssD01joh https://t.co/jOKJyG9EaJ
+"Apple is down 7% after earnings.
+
+That's about $40 BILLION in market cap gone in 30 minutes. Poof.
+
+$AAPL: https://t.co/ggfmPjJjkW"
+B4 CRASH 2008 - Paulson's speech:" OUR FINANCIAL SYSTEM IS STRONG" $VXX $VIX $TVIX $UVXY $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F
+$ONCI is ready to RUN this week! #stockmarket #pennystocks #parabolic $CDNL $MGT $GOOGL $AAPL $TSLA $TWTR $ONCI https://t.co/wwqf0RNOix
+Apple could sell its billionth iPhone in just a few months' time https://t.co/u2qFZ440dH $AAPL https://t.co/8cAchiZ0vC
+The iPhone might radically change in 2017 $AAPL https://t.co/IXLdCfEdus https://t.co/GpdMvFZPjE
+"The growth of smartphones. On one graph.
+
+A great share via: https://t.co/2hAJlarjSM
+
+$AAPL $GOOGL $MSFT https://t.co/BAwQRvYzou"
+"$AAPL finished last quarter with $232 billion in cash, meanwhile Kanye running up debts making records for Tidal.
+
+Bro."
+Which is bullish for $AAPL if you know anything about $GS https://t.co/WWssD01joh https://t.co/CQk8iKMI7w
+"The tech stocks with the MOST revenue
+
+1. $AAPL
+2. $AMZN
+3. $MSFT
+
+Visual by @OphirGottlieb: https://t.co/GpZ5ct2z5r https://t.co/H6sNKdtBHd"
+