You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/20 00:40:05 UTC
[2/2] incubator-apex-malhar git commit: APEXMALHAR-2006 Simple Stream
API
APEXMALHAR-2006 Simple Stream API
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ab76dacd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ab76dacd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ab76dacd
Branch: refs/heads/master
Commit: ab76dacd3c63a9d1b6b335ca19918cffff1eb728
Parents: cf3bb7d
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu May 19 17:22:20 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu May 19 17:22:20 2016 -0700
----------------------------------------------------------------------
.../com/datatorrent/lib/algo/UniqueCounter.java | 2 +-
pom.xml | 3 +-
stream/pom.xml | 100 ++++
.../apex/malhar/stream/api/ApexStream.java | 261 ++++++++++
.../apex/malhar/stream/api/WindowedStream.java | 54 ++
.../malhar/stream/api/function/Function.java | 80 +++
.../malhar/stream/api/impl/ApexStreamImpl.java | 495 +++++++++++++++++++
.../apex/malhar/stream/api/impl/DagMeta.java | 200 ++++++++
.../malhar/stream/api/impl/IDGenerator.java | 44 ++
.../malhar/stream/api/impl/StreamFactory.java | 81 +++
.../api/operator/AnnonymousClassModifier.java | 129 +++++
.../api/operator/ByteArrayClassLoader.java | 48 ++
.../stream/api/operator/FunctionOperator.java | 378 ++++++++++++++
.../apex/malhar/stream/api/util/KeyedTuple.java | 32 ++
.../apex/malhar/stream/api/util/TupleUtil.java | 34 ++
.../stream/api/impl/ApexStreamImplTest.java | 122 +++++
.../stream/sample/ApplicationWithStreamAPI.java | 57 +++
.../sample/ApplicationWithStreamAPITest.java | 53 ++
.../LocalTestWithoutStreamApplication.java | 85 ++++
.../apex/malhar/stream/sample/MyStream.java | 43 ++
.../apex/malhar/stream/sample/MyStreamTest.java | 140 ++++++
.../malhar/stream/sample/TupleCollector.java | 79 +++
stream/src/test/resources/data/word.txt | 2 +
23 files changed, 2520 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
index 1b6d944..013c8bc 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
+++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java
@@ -83,7 +83,7 @@ public class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
@Override
public Unifier<HashMap<K, Integer>> getUnifier()
{
- UnifierHashMapSumKeys unifierHashMapSumKeys = new UnifierHashMapSumKeys<K, Integer>();
+ UnifierHashMapSumKeys<K, Integer> unifierHashMapSumKeys = new UnifierHashMapSumKeys<>();
unifierHashMapSumKeys.setType(Integer.class);
return unifierHashMapSumKeys;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c0e98a3..b75fe13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -188,10 +188,11 @@
<profile>
<id>all-modules</id>
<modules>
+ <module>kafka</module>
+ <module>stream</module>
<module>benchmark</module>
<module>apps</module>
<module>samples</module>
- <module>kafka</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/pom.xml
----------------------------------------------------------------------
diff --git a/stream/pom.xml b/stream/pom.xml
new file mode 100755
index 0000000..0348236
--- /dev/null
+++ b/stream/pom.xml
@@ -0,0 +1,100 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-stream</artifactId>
+ <name>Apache Apex Malhar Stream API</name>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <!-- Publish tests jar -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-kafka</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>malhar-contrib</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apex-common</artifactId>
+ <version>${apex.core.version}</version>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>apex-engine</artifactId>
+ <version>${apex.core.version}</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
new file mode 100644
index 0000000..f97e8cc
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+/**
+ * The stream interface to build a DAG
+ * @param <T>
+ */
+public interface ApexStream<T>
+{
+ /**
+ * Simple map transformation<br>
+ * Add an operator to the DAG which convert tuple T to tuple O
+ * @param mapFunction map function
+ * @param <O> Type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mapFunction);
+
+ /**
+ * Simple map transformation<br>
+ * Add an operator to the DAG which convert tuple T to tuple O
+ * @param name operator name
+ * @param mapFunction map function
+ * @param <O> Type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mapFunction);
+
+ /**
+ * Flat map transformation
+ * Add an operator to the DAG which convert tuple T to a collection of tuple O
+ * @param flatten flat map
+ * @param <O> Type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten);
+
+ /**
+ * Flat map transformation<br>
+ * Add an operator to the DAG which convert tuple T to a collection of tuple O
+ * @param name operator name
+ * @param flatten
+ * @param <O> Type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten);
+
+ /**
+ * Filter transformation<br>
+ * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
+ * @param filter filter function
+ * @return new stream of same type
+ */
+ <STREAM extends ApexStream<T>> STREAM filter(Function.FilterFunction<T> filter);
+
+ /**
+ * Filter transformation<br>
+ * Add an operator to the DAG which filter out tuple T that cannot satisfy the FilterFunction
+ * @param name operator name
+ * @param filter filter function
+ * @return new stream of same type
+ */
+ <STREAM extends ApexStream<T>> STREAM filter(String name, Function.FilterFunction<T> filter);
+
+ /**
+ * Reduce transformation<br>
+ * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+ * @param reduce reduce function
+ * @return new stream of same type
+ */
+ <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce);
+
+ /**
+ * Reduce transformation<br>
+ * Add an operator to the DAG which merge tuple t1, t2 to new tuple
+ * @param name operator name
+ * @param reduce reduce function
+ * @return new stream of same type
+ */
+ <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce);
+
+ /**
+ * Fold transformation<br>
+ * Add an operator to the DAG which merge tuple T to accumulated result tuple O
+ * @param initialValue initial result value
+ * @param fold fold function
+ * @param <O> Result type
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM fold(O initialValue, Function.FoldFunction<T, O> fold);
+
+ /**
+ * Fold transformation<br>
+ * Add an operator to the DAG which merge tuple T to accumulated result tuple O
+ * @param name name of the operator
+ * @param initialValue initial result value
+ * @param fold fold function
+ * @param <O> Result type
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold);
+
+ /**
+ * Count of all tuples
+ * @return new stream of Integer
+ */
+ <STREAM extends ApexStream<Integer>> STREAM count();
+
+ /**
+ * Count tuples by the key<br>
+ * If the input is KeyedTuple it will get the key from getKey method from the tuple<br>
+ * If not, use the tuple itself as a key
+ * @return new stream of Map
+ */
+ <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey();
+
+ /**
+ *
+ * Count tuples by the indexed key
+ * @param key the index of the field in the tuple that are used as key
+ * @return new stream of Map
+ */
+ <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key);
+
+ /**
+ * Extend the dag by adding one operator<br>
+ * @param op Operator added to the stream
+ * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
+ * @param outputPort OutputPort of the operator will be connected to next operator
+ * @param <O> type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort);
+
+ /**
+ * Extend the dag by adding one {@see Operator}
+ * @param opName Operator name
+ * @param op Operator added to the stream
+ * @param inputPort InputPort of the operator that is connected to last exposed OutputPort in the stream
+ * @param outputPort OutputPort of the operator will be connected to next operator
+ * @param <O> type of the output
+ * @return new stream of type O
+ */
+ <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort);
+
+ /**
+ * Union multiple stream into one
+ * @param others other streams
+ * @return new stream of same type
+ */
+ <STREAM extends ApexStream<T>> STREAM union(ApexStream<T>... others);
+
+ /**
+ * Add a stdout console output operator
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM print();
+
+ /**
+ * Add a stderr console output operator
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM printErr();
+
+ /**
+ * Set the attribute value<br>
+ * If it is {@link DAGContext DAG attribute}, it will be applied to the whole DAG <br>
+ * If it is {@link OperatorContext Operator attribute}, it will be applied to last connected operator<br>
+ * If it is {@link PortContext InputPort attribute}, it will be applied to the input port of the last connected stream<br>
+ * If it is {@link PortContext OutputPort attribute}, it will be applied to the output port of the last connected stream<br>
+ * If it is both {@link PortContext InputPort&OutputPort attribute}, it will be applied to last connected stream
+ * @param attribute {@see Attribute}
+ * @param value value of the attribute
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM with(Attribute attribute, Object value);
+
+ /**
+ * Set attributes at the DAG level
+ * @param attribute {@see Attribute}
+ * @param value value of the attribute
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM setGlobalAttribute(Attribute attribute, Object value);
+
+ /**
+ * Set the locality
+ * @param locality {@see DAG.Locality}
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM with(DAG.Locality locality);
+
+ /**
+ * Set the property value of the last connected operator
+ * @param propName property name
+ * @param value value of the property
+ * @return stream itself
+ */
+ <STREAM extends ApexStream<T>> STREAM with(String propName, Object value);
+
+
+ /**
+ * Create dag from stream
+ * @return {@see DAG}
+ */
+ DAG createDag();
+
+ /**
+ * Populate existing dag
+ * @param dag {@see DAG}
+ */
+ void populateDag(DAG dag);
+
+
+ /**
+ * Run the stream application in local mode
+ * In Async mode, the method would return immediately and the dag would run for "duration" milliseconds
+ * In Sync mode, the method would block "duration" milliseconds to run the dag
+ * If duration is negative the dag would run forever until being killed
+ * @param async true if run in Async mode
+ * false if run in sync mode
+ */
+ void runEmbedded(boolean async, long duration, Callable<Boolean> exitCondition);
+
+
+ /**
+ * Submit the application to cluster
+ */
+ void run();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
new file mode 100644
index 0000000..a3bc846
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api;
+
+/**
+ * <p>
+ * A stream with windowed transformation
+ * </p>
+ * <p>
+ * <B>Transformation types:</B>
+ * <ul>
+ * <li>Combine</li>
+ * <li>Group</li>
+ * <li>Keyed Combine</li>
+ * <li>Keyed Group</li>
+ * <li>Join</li>
+ * <li>CoGroup</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <B>Features supported with windowed transformation </B>
+ * <ul>
+ * <li>Watermark - Ingestion time watermark / logical tuple watermark</li>
+ * <li>Early Triggers - How frequent to emit real-time partial result</li>
+ * <li>Late Triggers - When to emit updated result with tuple comes after watermark</li>
+ * <li>Customizable Trigger Behaviour - What to do when fires a trigger</li>
+ * <li>Spool window state - In-Memory window state can be spooled to disk if it is full</li>
+ * <li>3 different accumulation models: ignore, accumulation, accumulation + delta</li>
+ * <li>Window support: Non-Mergeable window(fix window, sliding window), Mergeable window(session window) base on 3 different tuple time</li>
+ * <li>Different tuple time support: event time, system time, ingestion time</li>
+ * </ul>
+ * </p>
+ *
+ * @param <T> Output tuple type
+ */
+public interface WindowedStream<T> extends ApexStream<T>
+{
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
new file mode 100644
index 0000000..1167744
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/function/Function.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.function;
+
+/**
+ * The top level function interface
+ */
+public interface Function
+{
+ /**
+ * If the {@link Function} implements this interface.
+ * The state of the function will be checkpointed
+ */
+ public static interface Stateful
+ {
+
+ }
+
+ /**
+ * An interface defines a one input one output transformation
+ * @param <I>
+ * @param <O>
+ */
+ public static interface MapFunction<I, O> extends Function
+ {
+ O f(I input);
+ }
+
+ /**
+ * An interface defines a reduce transformation
+ * @param <T>
+ */
+ public static interface ReduceFunction<T> extends Function
+ {
+ T reduce(T t1, T t2);
+ }
+
+ /**
+ * An interface that defines a fold transformation
+ * @param <I>
+ * @param <O>
+ */
+ public static interface FoldFunction<I, O> extends Function
+ {
+ O fold(I input, O output);
+ }
+
+ /**
+ * An interface that defines flatmap transforation
+ * @param <I>
+ * @param <O>
+ */
+ public static interface FlatMapFunction<I, O> extends MapFunction<I, Iterable<O>>
+ {
+ }
+
+ /**
+ * An interface that defines filter transformation
+ * @param <T>
+ */
+ public static interface FilterFunction<T> extends MapFunction<T, Boolean>
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
new file mode 100644
index 0000000..0e47727
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.operator.FunctionOperator;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Default stream implementation for ApexStream interface.
+ * It creates the dag(execution plan) from stream api
+ */
+public class ApexStreamImpl<T> implements ApexStream<T>
+{
+
+ private static Set<Attribute<?>> OPERATOR_ATTRIBUTES;
+
+ private static Set<Attribute<?>> DAG_ATTRIBUTES;
+
+ private static Set<Attribute<?>> INPUT_ATTRIBUTES;
+
+ private static Set<Attribute<?>> OUTPUT_ATTRIBUTES;
+
+ static {
+
+ OPERATOR_ATTRIBUTES = new HashSet<>();
+ DAG_ATTRIBUTES = new HashSet<>();
+ INPUT_ATTRIBUTES = new HashSet<>();
+ OUTPUT_ATTRIBUTES = new HashSet<>();
+
+ try {
+ for (Field field : Context.OperatorContext.class.getDeclaredFields()) {
+ if (field.getType() == Attribute.class) {
+ OPERATOR_ATTRIBUTES.add((Attribute)field.get(Context.OperatorContext.class));
+ }
+ }
+
+ for (Field field : Context.DAGContext.class.getDeclaredFields()) {
+ if (field.getType() == Attribute.class) {
+ DAG_ATTRIBUTES.add((Attribute)field.get(Context.DAGContext.class));
+ }
+ }
+ } catch (IllegalAccessException e) {
+ //Ignore here
+ }
+
+ INPUT_ATTRIBUTES.add(Context.PortContext.PARTITION_PARALLEL);
+ INPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
+ INPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
+ INPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
+
+
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.QUEUE_CAPACITY);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.BUFFER_MEMORY_MB);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.SPIN_MILLIS);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.UNIFIER_SINGLE_FINAL);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.IS_OUTPUT_UNIFIED);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.AUTO_RECORD);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.STREAM_CODEC);
+ OUTPUT_ATTRIBUTES.add(Context.PortContext.TUPLE_CLASS);
+
+ }
+
+ /**
+ * The extension point of the stream
+ *
+ * @param <T>
+ */
+ public static class Brick<T>
+ {
+
+ private Operator.OutputPort<T> lastOutput;
+
+ private DagMeta.NodeMeta nodeMeta;
+
+ private Pair<Operator.OutputPort, Operator.InputPort> lastStream;
+
+ public Operator.OutputPort<T> getLastOutput()
+ {
+ return lastOutput;
+ }
+
+ public void setLastOutput(Operator.OutputPort<T> lastOutput)
+ {
+ this.lastOutput = lastOutput;
+ }
+
+ public void setLastStream(Pair<Operator.OutputPort, Operator.InputPort> lastStream)
+ {
+ this.lastStream = lastStream;
+ }
+
+ public Pair<Operator.OutputPort, Operator.InputPort> getLastStream()
+ {
+ return lastStream;
+ }
+ }
+
+ /**
+ * Graph behind the stream
+ */
+ private DagMeta graph;
+
+ private ApexStream<T> delegator;
+
+ /**
+ * Right now the stream only support single extend point
+ * You can have multiple downstream operators connect to this single extend point though
+ */
+ private Brick<T> lastBrick;
+
+ public Brick<T> getLastBrick()
+ {
+ return lastBrick;
+ }
+
+ public void setLastBrick(Brick<T> lastBrick)
+ {
+ this.lastBrick = lastBrick;
+ }
+
+ public ApexStreamImpl()
+ {
+ graph = new DagMeta();
+ }
+
+ public ApexStreamImpl(ApexStream<T> apexStream)
+ {
+ this.delegator = apexStream;
+ if (delegator != null && delegator instanceof ApexStreamImpl) {
+ graph = ((ApexStreamImpl)delegator).graph;
+ lastBrick = ((ApexStreamImpl<T>)delegator).lastBrick;
+ }
+ }
+
+ public ApexStreamImpl(DagMeta graph)
+ {
+ this(graph, null);
+ }
+
+ public ApexStreamImpl(DagMeta graph, Brick<T> lastBrick)
+ {
+ this.graph = graph;
+ this.lastBrick = lastBrick;
+ }
+
+ @Override
+ public <O, STREAM extends ApexStream<O>> STREAM map(Function.MapFunction<T, O> mf)
+ {
+ return map(mf.toString(), mf);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <O, STREAM extends ApexStream<O>> STREAM map(String name, Function.MapFunction<T, O> mf)
+ {
+ FunctionOperator.MapFunctionOperator<T, O> opt = new FunctionOperator.MapFunctionOperator<>(mf);
+ return (STREAM)addOperator(name, opt, opt.input, opt.output);
+ }
+
+ @Override
+ public <O, STREAM extends ApexStream<O>> STREAM flatMap(Function.FlatMapFunction<T, O> flatten)
+ {
+ return flatMap(flatten.toString(), flatten);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <O, STREAM extends ApexStream<O>> STREAM flatMap(String name, Function.FlatMapFunction<T, O> flatten)
+ {
+ FunctionOperator.FlatMapFunctionOperator<T, O> opt = new FunctionOperator.FlatMapFunctionOperator<>(flatten);
+ return (STREAM)addOperator(name, opt, opt.input, opt.output);
+ }
+
+ @Override
+ public <STREAM extends ApexStream<T>> STREAM filter(final Function.FilterFunction<T> filter)
+ {
+ return filter(filter.toString(), filter);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <STREAM extends ApexStream<T>> STREAM filter(String name, final Function.FilterFunction<T> filter)
+ {
+ FunctionOperator.FilterFunctionOperator<T> filterFunctionOperator = new FunctionOperator.FilterFunctionOperator<>(filter);
+ return (STREAM)addOperator(name, filterFunctionOperator, filterFunctionOperator.input, filterFunctionOperator.output);
+ }
+
+ @Override
+ public <STREAM extends ApexStream<T>> STREAM reduce(Function.ReduceFunction<T> reduce)
+ {
+ return reduce(reduce.toString(), reduce);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <STREAM extends ApexStream<T>> STREAM reduce(String name, Function.ReduceFunction<T> reduce)
+ {
+ FunctionOperator.ReduceFunctionOperator<T> opt = new FunctionOperator.ReduceFunctionOperator<>(reduce);
+ return (STREAM)addOperator(name, opt, opt.input, opt.output);
+ }
+
+ @Override
+ public <O, STREAM extends ApexStream<O>> STREAM fold(final O initialValue, Function.FoldFunction<T, O> fold)
+ {
+ return fold(fold.toString(), initialValue, fold);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <O, STREAM extends ApexStream<O>> STREAM fold(String name, O initialValue, Function.FoldFunction<T, O> fold)
+ {
+ FunctionOperator.FoldFunctionOperator<T, O> opt = new FunctionOperator.FoldFunctionOperator<>(fold, initialValue);
+ return (STREAM)addOperator(name, opt, opt.input, opt.output);
+ }
+
+ @Override
+ public <STREAM extends ApexStream<Integer>> STREAM count()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey(int key)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <STREAM extends ApexStream<Map<Object, Integer>>> STREAM countByKey()
+ {
+ // Needs to change the unique counter to support keys
+ UniqueCounter<Object> uniqueCounter = new UniqueCounter<>();
+ uniqueCounter.setCumulative(true);
+ Operator.OutputPort<? extends Map<Object, Integer>> resultPort = uniqueCounter.count;
+ return (STREAM)addOperator("CounterByKey", uniqueCounter, (Operator.InputPort<T>)uniqueCounter.data, resultPort);
+ }
+
+ @Override
+ public <O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+ {
+ return addOperator(op.toString(), op, inputPort, outputPort);
+ }
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <O, STREAM extends ApexStream<O>> STREAM addOperator(String opName, Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort)
+ {
+
+ if (delegator != null) {
+ ApexStreamImpl<O> apexStream = delegator.addOperator(opName, op, inputPort, outputPort);
+ try {
+ return (STREAM)this.getClass().getConstructor(ApexStream.class).newInstance(apexStream);
+ } catch (Exception e) {
+ throw new RuntimeException("You have to override the default constructor with ApexStreamImpl as delegator");
+ }
+ }
+
+ checkArguments(op, inputPort, outputPort);
+
+ DagMeta.NodeMeta nm = null;
+
+ if (lastBrick == null) {
+ nm = graph.addNode(opName, op, null, null, inputPort);
+ } else {
+
+ nm = graph.addNode(opName, op, lastBrick.nodeMeta, lastBrick.lastOutput, inputPort);
+ }
+
+ Brick<O> newBrick = new Brick<>();
+ newBrick.nodeMeta = nm;
+ newBrick.setLastOutput(outputPort);
+ if (lastBrick != null) {
+ newBrick.lastStream = Pair.<Operator.OutputPort, Operator.InputPort>of(lastBrick.lastOutput, inputPort);
+ }
+
+ return (STREAM)new ApexStreamImpl<>(this.graph, newBrick);
+ }
+
+ /* Check to see if inputPort and outputPort belongs to the operator */
+ private void checkArguments(Operator op, Operator.InputPort inputPort, Operator.OutputPort outputPort)
+ {
+ if (op == null) {
+ throw new IllegalArgumentException("Operator can not be null");
+ }
+
+ boolean foundInput = inputPort == null;
+ boolean foundOutput = outputPort == null;
+ for (Field f : op.getClass().getFields()) {
+ int modifiers = f.getModifiers();
+ if (!Modifier.isPublic(modifiers) || !Modifier.isTransient(modifiers)) {
+ continue;
+ }
+ Object obj = null;
+ try {
+ obj = f.get(op);
+ } catch (IllegalAccessException e) {
+ // NonAccessible field is not a valid port object
+ }
+ if (obj == outputPort) {
+ foundOutput = true;
+ }
+ if (obj == inputPort) {
+ foundInput = true;
+ }
+ }
+ if (!foundInput || !foundOutput) {
+ throw new IllegalArgumentException("Input port " + inputPort + " and/or Output port " + outputPort + " is/are not owned by Operator " + op);
+ }
+
+ }
+
+ @Override
+ public <STREAM extends ApexStream<T>> STREAM union(ApexStream<T>... others)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStreamImpl<T> print()
+ {
+ ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
+ addOperator(IDGenerator.generateOperatorIDWithUUID(consoleOutputOperator.getClass()), consoleOutputOperator,
+ (Operator.InputPort<T>)consoleOutputOperator.input, null);
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStream<T> printErr()
+ {
+ //TODO need to make ConsoleOutputOperator support stderr
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStream<T> with(Attribute attribute, Object value)
+ {
+ if (OPERATOR_ATTRIBUTES.contains(attribute)) {
+ lastBrick.nodeMeta.operatorAttributes.add(Pair.of(attribute, value));
+ }
+
+ if (INPUT_ATTRIBUTES.contains(attribute)) {
+ if (lastBrick.lastStream != null) {
+ List<Pair<Attribute, Object>> attrs = lastBrick.nodeMeta.inputPortAttributes.get(lastBrick.lastStream.getRight());
+ if (attrs == null) {
+ attrs = new LinkedList<>();
+ }
+ attrs.add(Pair.of(attribute, value));
+ lastBrick.nodeMeta.inputPortAttributes.put(lastBrick.lastStream.getRight(), attrs);
+ }
+ }
+
+ if (OUTPUT_ATTRIBUTES.contains(attribute)) {
+ if (lastBrick.lastStream != null) {
+
+ for (DagMeta.NodeMeta parent : lastBrick.nodeMeta.getParent()) {
+ parent.getNodeStreams().containsKey(lastBrick.lastStream.getLeft());
+ List<Pair<Attribute, Object>> attrs = parent.outputPortAttributes.get(lastBrick.lastStream.getLeft());
+ if (attrs == null) {
+ attrs = new LinkedList<>();
+ }
+ attrs.add(Pair.of(attribute, value));
+ lastBrick.nodeMeta.outputPortAttributes.put(lastBrick.lastStream.getLeft(), attrs);
+ }
+ }
+ }
+
+ setGlobalAttribute(attribute, value);
+
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStream<T> setGlobalAttribute(Attribute attribute, Object value)
+ {
+ graph.dagAttributes.add(Pair.of(attribute, value));
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStream<T> with(DAG.Locality locality)
+ {
+ if (lastBrick.lastStream != null) {
+ for (DagMeta.NodeMeta parent : lastBrick.nodeMeta.getParent()) {
+ Pair<List<Operator.InputPort>, DAG.Locality> p = parent.getNodeStreams().get(lastBrick.lastStream.getLeft());
+ if (p != null) {
+ p.setValue(locality);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ApexStream<T> with(String propName, Object value)
+ {
+ try {
+ BeanUtils.setProperty(lastBrick.nodeMeta.getOperator(), propName, value);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+
+ @Override
+ public DAG createDag()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ populateDag(dag);
+ return dag;
+ }
+
+ @Override
+ public void populateDag(DAG dag)
+ {
+ graph.buildDAG(dag);
+ }
+
+ @Override
+ public void runEmbedded(boolean async, long duration, Callable<Boolean> exitCondition)
+ {
+ LocalMode lma = LocalMode.newInstance();
+ populateDag(lma.getDAG());
+ LocalMode.Controller lc = lma.getController();
+ if (lc instanceof StramLocalCluster) {
+ ((StramLocalCluster)lc).setExitCondition(exitCondition);
+ }
+ if (async) {
+ lc.runAsync();
+ } else {
+ if (duration >= 0) {
+ lc.run(duration);
+ } else {
+ lc.run();
+ }
+ }
+
+ }
+
+
+ @Override
+ public void run()
+ {
+ throw new UnsupportedOperationException();
+ //TODO need an api to submit the StreamingApplication to cluster
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
new file mode 100644
index 0000000..15417b5
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Graph data structure for DAG
+ * With this data structure, the framework can do lazy load and optimization
+ */
+public class DagMeta
+{
+
+ private List<NodeMeta> heads = new LinkedList<>();
+
+ List<Pair<Attribute, Object>> dagAttributes = new LinkedList<>();
+
+ public static class NodeMeta
+ {
+
+ private String nodeName;
+
+ private Operator operator;
+
+ List<Pair<Attribute, Object>> operatorAttributes = new LinkedList<>();
+
+ private Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> nodeStreams = new HashMap<>();
+
+ Map<Operator.OutputPort, List<Pair<Attribute, Object>>> outputPortAttributes = new HashMap<>();
+
+ Map<Operator.InputPort, List<Pair<Attribute, Object>>> inputPortAttributes = new HashMap<>();
+
+ private Set<Operator.InputPort> operatorInputs = new HashSet<>();
+
+ private List<NodeMeta> children = new LinkedList<>();
+
+ private List<NodeMeta> parent = new LinkedList<>();
+
+ public List<NodeMeta> getParent()
+ {
+ return parent;
+ }
+
+ public List<NodeMeta> getChildren()
+ {
+ return children;
+ }
+
+ public String getNodeName()
+ {
+ return nodeName;
+ }
+
+ public Operator getOperator()
+ {
+ return operator;
+ }
+
+ public Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> getNodeStreams()
+ {
+ return nodeStreams;
+ }
+
+ public NodeMeta(Operator operator, String nodeName)
+ {
+
+ this.nodeName = nodeName;
+
+ this.operator = operator;
+
+ for (Field field : this.operator.getClass().getFields()) {
+ int modifier = field.getModifiers();
+ if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
+ Operator.OutputPort.class.isAssignableFrom(field.getType())) {
+ try {
+ nodeStreams.put((Operator.OutputPort)field.get(operator), MutablePair.<List<Operator.InputPort>, DAG.Locality>of(new LinkedList<Operator.InputPort>(), null));
+ } catch (IllegalAccessException e) {
+ //Do nothing because it's been checked in if condition
+ }
+ }
+ if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) &&
+ Operator.InputPort.class.isAssignableFrom(field.getType())) {
+ try {
+ operatorInputs.add((Operator.InputPort)field.get(operator));
+ } catch (IllegalAccessException e) {
+ //Do nothing because it's been checked in if condition
+ }
+ }
+ }
+ }
+
+ }
+
+ public DagMeta()
+ {
+
+ }
+
+ public DAG buildDAG()
+ {
+ DAG dag = new LogicalPlan();
+ buildDAG(dag);
+ return dag;
+ }
+
+ public void buildDAG(DAG dag)
+ {
+ for (NodeMeta nm : heads) {
+ visitNode(nm, dag);
+ }
+ }
+
+ private void visitNode(NodeMeta nm, DAG dag)
+ {
+ dag.addOperator(nm.nodeName, nm.operator);
+ for (NodeMeta child : nm.children) {
+ visitNode(child, dag);
+ }
+
+ for (Map.Entry<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> entry : nm.nodeStreams.entrySet()) {
+ if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) {
+ continue;
+ }
+ DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(),
+ entry.getValue().getLeft().toArray(new Operator.InputPort[]{}));
+ // set locality
+ if (entry.getValue().getRight() != null) {
+ streamMeta.setLocality(entry.getValue().getRight());
+ }
+ //set attributes for output port
+ if (nm.outputPortAttributes.containsKey(entry.getKey())) {
+ for (Pair<Attribute, Object> attr : nm.outputPortAttributes.get(entry.getKey())) {
+ dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue());
+ }
+ }
+ }
+
+
+ for (Operator.InputPort input : nm.operatorInputs) {
+ //set input port attributes
+ if (nm.inputPortAttributes.containsKey(input)) {
+ for (Pair<Attribute, Object> attr : nm.inputPortAttributes.get(input)) {
+ dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue());
+ }
+ }
+ }
+
+ // set operator attributes
+ for (Pair<Attribute, Object> attr : nm.operatorAttributes) {
+ dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue());
+ }
+
+ }
+
+ public NodeMeta addNode(String nodeName, Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort)
+ {
+
+ NodeMeta newNode = new NodeMeta(operator, nodeName);
+ if (parent == null) {
+ heads.add(newNode);
+ } else {
+ parent.nodeStreams.get(parentOutput).getLeft().add(inputPort);
+ parent.children.add(newNode);
+ newNode.parent.add(parent);
+ }
+ return newNode;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
new file mode 100644
index 0000000..36200f2
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.UUID;
+
+import com.datatorrent.api.Operator;
+
+import static java.lang.System.currentTimeMillis;
+
+/**
+ * A util class to generate unique ids in dag
+ */
+public class IDGenerator
+{
+
+ public static String generateOperatorIDWithClock(Class<? extends Operator> operatorClazz)
+ {
+ return operatorClazz.getName() + currentTimeMillis();
+ }
+
+ public static String generateOperatorIDWithUUID(Class<? extends Operator> operatorClazz)
+ {
+ return operatorClazz.getName() + UUID.randomUUID();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
new file mode 100644
index 0000000..70a11b5
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+import org.apache.apex.malhar.stream.api.ApexStream;
+
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+
+/**
+ * A Factory class to build from different kind of input source
+ */
+public class StreamFactory
+{
+ public static ApexStream<String> fromFolder(String inputOperatorName, String folderName)
+ {
+ LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator();
+ fileLineInputOperator.setDirectory(folderName);
+ ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+ return newStream.addOperator(inputOperatorName, fileLineInputOperator, null, fileLineInputOperator.output);
+ }
+
+ public static ApexStream<String> fromFolder(String folderName)
+ {
+ return fromFolder("FolderScanner", folderName);
+ }
+
+ public static ApexStream<String> fromKafka08(String zookeepers, String topic)
+ {
+ return fromKafka08("Kafka08Input", zookeepers, topic);
+ }
+
+ public static ApexStream<String> fromKafka08(String inputName, String zookeepers, String topic)
+ {
+ KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
+ kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic);
+ kafkaSinglePortStringInputOperator.getConsumer().setZookeeper(zookeepers);
+ ApexStreamImpl<String> newStream = new ApexStreamImpl<>();
+ return newStream.addOperator(kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort);
+ }
+
+ public static <T> ApexStream<T> fromInput(String inputOperatorName, InputOperator operator, Operator.OutputPort<T> outputPort)
+ {
+ ApexStreamImpl<T> newStream = new ApexStreamImpl<>();
+ return newStream.addOperator(inputOperatorName, operator, null, outputPort);
+ }
+
+ public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort)
+ {
+ return fromInput(operator.toString(), operator, outputPort);
+ }
+
+ public static ApexStream<String> fromKafka09(String name, String brokers, String topic)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public static ApexStream<String> fromKafka09(String brokers, String topic)
+ {
+ return fromKafka09("KafkaInput", brokers, topic);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
new file mode 100644
index 0000000..9628354
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.FieldVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.MethodVisitor;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+/**
+ * Because annonymous class serialization is not supported by default in most serialization library
+ * This class is used to modify the bytecode of annonymous at runtime.
+ * The limit for this is the annonymous class that is being modified must by stateless
+ *
+ */
+public class AnnonymousClassModifier extends ClassVisitor
+{
+ private String className;
+
+ private boolean hasDefaultConstructor = false;
+
+ public AnnonymousClassModifier(int i)
+ {
+ super(i);
+ }
+
+ public AnnonymousClassModifier(int i, ClassVisitor classVisitor)
+ {
+ super(i, classVisitor);
+ }
+
+ @Override
+ public void visit(int i, int i1, String s, String s1, String s2, String[] strings)
+ {
+ className = s;
+ super.visit(i, 33, s, s1, s2, strings);
+ }
+
+ @Override
+ public void visitSource(String s, String s1)
+ {
+ super.visitSource(s, s1);
+ }
+
+ @Override
+ public void visitOuterClass(String s, String s1, String s2)
+ {
+ // skip outer class, make it top level. For now only one level annonymous class
+ return;
+ }
+
+ @Override
+ public AnnotationVisitor visitAnnotation(String s, boolean b)
+ {
+ return super.visitAnnotation(s, b);
+ }
+
+
+ @Override
+ public void visitAttribute(Attribute attribute)
+ {
+ super.visitAttribute(attribute);
+ }
+
+ @Override
+ public void visitInnerClass(String s, String s1, String s2, int i)
+ {
+ if (s.equals(className)) {
+ return;
+ }
+ super.visitInnerClass(s, s1, s2, i);
+ }
+
+ @Override
+ public FieldVisitor visitField(int i, String s, String s1, String s2, Object o)
+ {
+ return super.visitField(i, s, s1, s2, o);
+ }
+
+ @Override
+ public MethodVisitor visitMethod(int i, String s, String s1, String s2, String[] strings)
+ {
+ //make the constructor public
+ int j = s.equals("<init>") ? i | Opcodes.ACC_PUBLIC : i;
+ if (s1.contains("()V")) {
+ hasDefaultConstructor = true;
+ }
+
+ return super.visitMethod(i, s, s1, s2, strings);
+ }
+
+ @Override
+ public void visitEnd()
+ {
+
+ // If there is no default constructor, create one
+ if (!hasDefaultConstructor) {
+ MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC, "<init>", "()V", null, null);
+ mv.visitVarInsn(Opcodes.ALOAD, 0);
+ mv.visitMethodInsn(Opcodes.INVOKESPECIAL,
+ "java/lang/Object",
+ "<init>",
+ "()V");
+ mv.visitInsn(Opcodes.RETURN);
+ mv.visitMaxs(5, 1);
+ mv.visitEnd();
+ }
+
+ super.visitEnd();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
new file mode 100644
index 0000000..61ec10b
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+
+import java.util.Map;
+
+public class ByteArrayClassLoader extends ClassLoader
+{
+ private final Map<String, byte[]> classes;
+
+ public ByteArrayClassLoader(Map<String, byte[]> classes)
+ {
+ this.classes = classes;
+ }
+
+ public ByteArrayClassLoader(Map<String, byte[]> classes, ClassLoader parent)
+ {
+ super(parent);
+ this.classes = classes;
+ }
+
+ protected Class findClass(String name) throws ClassNotFoundException
+ {
+ byte[] data = (byte[])((byte[])this.classes.get(name));
+ if (data == null) {
+ throw new ClassNotFoundException(name);
+ } else {
+ return super.defineClass(name, data, 0, data.length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
new file mode 100644
index 0000000..5f80062
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.operator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.commons.io.IOUtils;
+
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
+import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * Operators that wrap the functions
+ */
+public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator
+{
+ private byte[] annonymousFunctionClass;
+
+ protected transient FUNCTION statelessF;
+
+ protected FUNCTION statefulF;
+
+ protected boolean stateful = false;
+
+ protected boolean isAnnonymous = false;
+
+ public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>();
+
+ public FunctionOperator(FUNCTION f)
+ {
+ isAnnonymous = f.getClass().isAnonymousClass();
+ if (isAnnonymous) {
+ annonymousFunctionClass = functionClassData(f);
+ } else if (f instanceof Function.Stateful) {
+ statelessF = f;
+ } else {
+ statefulF = f;
+ stateful = true;
+ }
+ }
+
+ private byte[] functionClassData(Function f)
+ {
+ Class<? extends Function> classT = f.getClass();
+
+ byte[] classBytes = null;
+ byte[] classNameBytes = null;
+ String className = classT.getName();
+ try {
+ classNameBytes = className.replace('.', '/').getBytes();
+ classBytes = IOUtils.toByteArray(classT.getClassLoader().getResourceAsStream(className.replace('.', '/') + ".class"));
+ int cursor = 0;
+ for (int j = 0; j < classBytes.length; j++) {
+ if (classBytes[j] != classNameBytes[cursor]) {
+ cursor = 0;
+ } else {
+ cursor++;
+ }
+
+ if (cursor == classNameBytes.length) {
+ for (int p = 0; p < classNameBytes.length; p++) {
+ if (classBytes[j - p] == '$') {
+ classBytes[j - p] = '_';
+ }
+ }
+ cursor = 0;
+ }
+ }
+ ClassReader cr = new ClassReader(new ByteArrayInputStream(classBytes));
+ ClassWriter cw = new ClassWriter(0);
+ AnnonymousClassModifier annonymousClassModifier = new AnnonymousClassModifier(Opcodes.ASM4, cw);
+ cr.accept(annonymousClassModifier, 0);
+ classBytes = cw.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ int dataLength = classNameBytes.length + 4 + 4;
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(dataLength);
+ DataOutputStream output = new DataOutputStream(byteArrayOutputStream);
+
+ try {
+ output.writeInt(classNameBytes.length);
+ output.write(className.replace('$', '_').getBytes());
+ output.writeInt(classBytes.length);
+ output.write(classBytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ output.flush();
+ output.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return byteArrayOutputStream.toByteArray();
+
+ }
+
+ /**
+ * Default constructor to make kryo happy
+ */
+ public FunctionOperator()
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long l)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ readFunction();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private void readFunction()
+ {
+ try {
+ if (statelessF != null || statefulF != null) {
+ return;
+ }
+ DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
+ byte[] classNameBytes = new byte[input.readInt()];
+ input.read(classNameBytes);
+ String className = new String(classNameBytes);
+ byte[] classData = new byte[input.readInt()];
+ input.read(classData);
+ Map<String, byte[]> classBin = new HashMap<>();
+ classBin.put(className, classData);
+ ByteArrayClassLoader byteArrayClassLoader = new ByteArrayClassLoader(classBin, Thread.currentThread().getContextClassLoader());
+ statelessF = ((Class<FUNCTION>)byteArrayClassLoader.findClass(className)).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ public FUNCTION getFunction()
+ {
+ readFunction();
+ if (stateful) {
+ return statefulF;
+ } else {
+ return statelessF;
+ }
+ }
+
+ public FUNCTION getStatelessF()
+ {
+ return statelessF;
+ }
+
+ public void setStatelessF(FUNCTION statelessF)
+ {
+ this.statelessF = statelessF;
+ }
+
+ public FUNCTION getStatefulF()
+ {
+ return statefulF;
+ }
+
+ public void setStatefulF(FUNCTION statefulF)
+ {
+ this.statefulF = statefulF;
+ }
+
+ public boolean isStateful()
+ {
+ return stateful;
+ }
+
+ public void setStateful(boolean stateful)
+ {
+ this.stateful = stateful;
+ }
+
+ public boolean isAnnonymous()
+ {
+ return isAnnonymous;
+ }
+
+ public void setIsAnnonymous(boolean isAnnonymous)
+ {
+ this.isAnnonymous = isAnnonymous;
+ }
+
+ public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>>
+ {
+
+ public MapFunctionOperator()
+ {
+
+ }
+
+ public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ {
+ @Override
+ public void process(IN t)
+ {
+ Function.MapFunction<IN, OUT> f = getFunction();
+ output.emit(f.f(t));
+ }
+ };
+
+ public MapFunctionOperator(Function.MapFunction<IN, OUT> f)
+ {
+ super(f);
+ }
+ }
+
+ public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>>
+ {
+
+ public FlatMapFunctionOperator()
+ {
+
+ }
+
+ public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ {
+ @Override
+ public void process(IN t)
+ {
+ Function.FlatMapFunction<IN, OUT> f = getFunction();
+ for (OUT out : f.f(t)) {
+ output.emit(out);
+ }
+ }
+ };
+
+ public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f)
+ {
+ super(f);
+ }
+ }
+
+ public static class FoldFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FoldFunction<IN, OUT>>
+ {
+
+ public FoldFunctionOperator()
+ {
+
+ }
+
+ @NotNull
+ private OUT foldVal;
+
+ public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ {
+ @Override
+ public void process(IN t)
+ {
+ Function.FoldFunction<IN, OUT> f = getFunction();
+ // fold the value
+ foldVal = f.fold(t, foldVal);
+ output.emit(foldVal);
+ }
+ };
+
+ public FoldFunctionOperator(Function.FoldFunction<IN, OUT> f, OUT initialVal)
+ {
+ super(f);
+ this.foldVal = initialVal;
+ }
+ }
+
+ public static class ReduceFunctionOperator<IN> extends FunctionOperator<IN, Function.ReduceFunction<IN>>
+ {
+
+ public ReduceFunctionOperator()
+ {
+
+ }
+
+ @NotNull
+ private IN reducedVal;
+
+ public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ {
+ @Override
+ public void process(IN t)
+ {
+ Function.ReduceFunction<IN> f = getFunction();
+ // fold the value
+ if (reducedVal == null) {
+ reducedVal = t;
+ return;
+ }
+ reducedVal = f.reduce(t, reducedVal);
+ output.emit(reducedVal);
+ }
+ };
+
+ public ReduceFunctionOperator(Function.ReduceFunction<IN> f)
+ {
+ super(f);
+ }
+ }
+
+ public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>>
+ {
+
+ public FilterFunctionOperator()
+ {
+
+ }
+
+ public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>()
+ {
+ @Override
+ public void process(IN t)
+ {
+ Function.FilterFunction<IN> f = getFunction();
+ // fold the value
+ if (f.f(t)) {
+ output.emit(t);
+ }
+ }
+ };
+
+ public FilterFunctionOperator(Function.FilterFunction<IN> f)
+ {
+ super(f);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
new file mode 100644
index 0000000..755521f
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.util;
+
+/**
+ * An interface indicate a tuple with a specific key
+ * It is used internally to identify the key from the tuple
+ */
+public interface KeyedTuple<K>
+{
+ /**
+ * Return the key of the tuple
+ * @return
+ */
+ K getKey();
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
new file mode 100644
index 0000000..f6c2552
--- /dev/null
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.util;
+
+/**
+ * The tuple util will be used to extract fields that are used as key or value<br>
+ * Or converting from data tuples to display tuples <br>
+ * Or generating watermark tuples <br>
+ *
+ */
+public class TupleUtil
+{
+
+ public static interface NONE
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
new file mode 100644
index 0000000..71b9a82
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.api.impl;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+/**
+ * Unit test to default implementation of ApexStream interface
+ */
+public class ApexStreamImplTest
+{
+
+ @Test
+ public void testAddOperator()
+ {
+ LogicalPlan dag = new LogicalPlan();
+ TestOperator<String, Integer> firstOperator = new TestOperator<>();
+ TestOperator<Integer, Date> secondOperator = new TestOperator<>();
+ new ApexStreamImpl<String>().addOperator("first", firstOperator, null, firstOperator.output)
+ .addOperator("second", secondOperator, secondOperator.input, null)
+ .with(DAG.Locality.THREAD_LOCAL)
+ .with(Context.OperatorContext.AUTO_RECORD, true)
+ .with("prop", "TestProp").populateDag(dag);
+ Assert.assertTrue(dag.getAllOperators().size() == 2);
+ Set<String> opNames = new HashSet<>();
+ opNames.add("first");
+ opNames.add("second");
+ for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
+ Assert.assertTrue(operatorMeta.getOperator() instanceof TestOperator);
+ Assert.assertTrue(opNames.contains(operatorMeta.getName()));
+ if (operatorMeta.getName().equals("second")) {
+ // Assert the Context.OperatorContext.AUTO_RECORD attribute has been set to true for second operator
+ Assert.assertTrue(operatorMeta.getAttributes().get(Context.OperatorContext.AUTO_RECORD));
+ // Assert the prop has been set to TestProp for second operator
+ Assert.assertEquals("TestProp", ((TestOperator)operatorMeta.getOperator()).prop);
+ } else {
+ // Assert the Context.OperatorContext.AUTO_RECORD attribute keeps as default false for first operator
+ Assert.assertNull(operatorMeta.getAttributes().get(Context.OperatorContext.AUTO_RECORD));
+ // Assert the prop has not been set for first operator
+ Assert.assertNull(((TestOperator)operatorMeta.getOperator()).prop);
+ }
+ }
+
+ Collection<LogicalPlan.StreamMeta> streams = dag.getAllStreams();
+ // Assert there is only one stream
+ Assert.assertTrue(streams.size() == 1);
+ for (LogicalPlan.StreamMeta stream : streams) {
+
+ // Assert the stream is from first operator to second operator
+ Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName());
+ Assert.assertTrue(1 == stream.getSinks().size());
+ Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName());
+
+ // Assert the stream is thread local
+ Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL);
+ }
+
+ }
+
+ /**
+ * A mock operator for test
+ * @param <T>
+ * @param <O>
+ */
+ public static class TestOperator<T, O> extends BaseOperator
+ {
+
+ private String prop = null;
+
+ public void setProp(String prop)
+ {
+ this.prop = prop;
+ }
+
+ public String getProp()
+ {
+ return prop;
+ }
+
+ public final transient InputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T o)
+ {
+
+ }
+ };
+
+ public final transient OutputPort<O> output = new DefaultOutputPort<>();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
new file mode 100644
index 0000000..44f76b1
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+/**
+ * An application example with stream api
+ */
+@ApplicationAnnotation(name = "WordCountStreamingApiDemo")
+public class ApplicationWithStreamAPI implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration configuration)
+ {
+ String localFolder = "./src/test/resources/data";
+ ApexStream stream = StreamFactory
+ .fromFolder(localFolder)
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split(" "));
+ }
+ });
+ stream.print();
+ stream.countByKey().print();
+ stream.populateDag(dag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
new file mode 100644
index 0000000..70f26f2
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
+/**
+ * A application test using stream api
+ */
+public class ApplicationWithStreamAPITest
+{
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationWithStreamAPITest.class);
+
+ public ApplicationWithStreamAPITest()
+ {
+ }
+
+ @Test
+ public void testWordcountApplication() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new ApplicationWithStreamAPI(), conf);
+ LocalMode.Controller lc = lma.getController();
+ long start = System.currentTimeMillis();
+ lc.run(5000);
+ long end = System.currentTimeMillis();
+ long time = end - start;
+ logger.info("Test used " + time + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
new file mode 100644
index 0000000..d679135
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.StreamFactory;
+
+
+/**
+ * A embedded application test without creating Streaming Application
+ */
+public class LocalTestWithoutStreamApplication
+{
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNonStreamApplicationWordcount() throws Exception
+ {
+
+ TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>();
+ collector.id = "testNonStreamApplicationWordcount";
+ final Map<Object, Integer> expected = new HashMap<>();
+ expected.put("error", 2);
+ expected.put("word1", 4);
+ expected.put("word2", 8);
+ expected.put("word3", 4);
+ expected.put("word4", 4);
+ expected.put("word5", 4);
+ expected.put("word7", 4);
+ expected.put("word9", 6);
+
+ Callable<Boolean> exitCondition = new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+ return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1));
+ }
+ };
+
+
+ StreamFactory.fromFolder("./src/test/resources/data")
+ .flatMap(new Function.FlatMapFunction<String, String>()
+ {
+ @Override
+ public Iterable<String> f(String input)
+ {
+ return Arrays.asList(input.split(" "));
+ }
+ })
+ .countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition);
+
+
+ List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount");
+
+ Assert.assertNotNull(data);
+ Assert.assertTrue(data.size() > 1);
+ Assert.assertEquals(expected, data.get(data.size() - 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ab76dacd/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
new file mode 100644
index 0000000..4958a8e
--- /dev/null
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.stream.sample;
+
+import org.apache.apex.malhar.stream.api.ApexStream;
+import org.apache.apex.malhar.stream.api.function.Function;
+import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
+
+import com.datatorrent.api.DAG;
+
+/**
+ * An example to create your own stream
+ */
+public class MyStream<T> extends ApexStreamImpl<T>
+{
+
+ public MyStream(ApexStream<T> apexStream)
+ {
+ super(apexStream);
+ }
+
+ <O> MyStream<O> myFilterAndMap(Function.MapFunction<T, O> map, Function.FilterFunction<T> filterFunction)
+ {
+ return filter(filterFunction).map(map).with(DAG.Locality.THREAD_LOCAL);
+ }
+
+}