You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:45 UTC
[08/51] [abbrv] git commit: [streaming] Added CoFunctions for two
type inputs
[streaming] Added CoFunctions for two type inputs
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f436690f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f436690f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f436690f
Branch: refs/heads/master
Commit: f436690fb4ec4d9b5dc7342b5124b6fb0e855d76
Parents: 6e52195
Author: ghermann <re...@gmail.com>
Authored: Tue Jul 22 11:28:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:14 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 7 +
.../flink/streaming/api/JobGraphBuilder.java | 95 ++++++---
.../api/StreamExecutionEnvironment.java | 43 +++-
.../api/function/co/CoMapFunction.java | 29 +++
.../api/invokable/operator/co/CoInvokable.java | 55 ++++++
.../invokable/operator/co/CoMapInvokable.java | 53 +++++
.../AbstractStreamComponent.java | 144 +++-----------
.../api/streamcomponent/CoStreamTask.java | 198 +++++++++++++++++++
.../SingleInputAbstractStreamComponent.java | 129 ++++++++++++
.../streamcomponent/StreamIterationSink.java | 2 +-
.../streamcomponent/StreamIterationSource.java | 2 +-
.../api/streamcomponent/StreamSink.java | 2 +-
.../api/streamcomponent/StreamSource.java | 2 +-
.../api/streamcomponent/StreamTask.java | 2 +-
.../api/invokable/operator/CoMapTest.java | 77 ++++++++
15 files changed, 675 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 4356795..6ef2faf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
@@ -36,6 +37,7 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -387,6 +389,11 @@ public class DataStream<T extends Tuple> {
new BatchReduceInvokable<T, R>(reducer, batchSize));
}
+ public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
+ return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+ }
+
+
/**
* Applies a reduce transformation on preset "time" chunks of the
* DataStream. The transformation calls a {@link GroupReduceFunction} on
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 2b3ee5a..022fcd8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -41,12 +41,13 @@ import org.apache.flink.runtime.jobgraph.JobInputVertex;
import org.apache.flink.runtime.jobgraph.JobOutputVertex;
import org.apache.flink.runtime.jobgraph.JobTaskVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
import org.apache.flink.streaming.api.streamcomponent.StreamSink;
@@ -70,8 +71,8 @@ public class JobGraphBuilder {
// Graph attributes
private Map<String, AbstractJobVertex> components;
private Map<String, Integer> componentParallelism;
- private Map<String, Boolean> mutability;
- private Map<String, List<String>> outEdgeList;
+ private Map<String, ArrayList<String>> outEdgeList;
+ private Map<String, ArrayList<Integer>> outEdgeType;
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
private Map<String, String> userDefinedNames;
@@ -101,8 +102,8 @@ public class JobGraphBuilder {
components = new HashMap<String, AbstractJobVertex>();
componentParallelism = new HashMap<String, Integer>();
- mutability = new HashMap<String, Boolean>();
- outEdgeList = new HashMap<String, List<String>>();
+ outEdgeList = new HashMap<String, ArrayList<String>>();
+ outEdgeType = new HashMap<String, ArrayList<Integer>>();
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
userDefinedNames = new HashMap<String, String>();
@@ -171,7 +172,7 @@ public class JobGraphBuilder {
setBytesFrom(iterationHead, componentName);
setEdge(componentName, iterationHead,
- connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0));
+ connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
if (LOG.isDebugEnabled()) {
LOG.debug("ITERATION SOURCE: " + componentName);
@@ -204,6 +205,18 @@ public class JobGraphBuilder {
}
}
+ public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void addCoTask(
+ String componentName, CoInvokable<IN1, IN2, OUT> taskInvokableObject,
+ String operatorName, byte[] serializedFunction, int parallelism) {
+
+ addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
+ serializedFunction, parallelism);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CO-TASK: " + componentName);
+ }
+ }
+
/**
* Adds sink to the JobGraph with the given parameters
*
@@ -218,7 +231,7 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
- public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
+ public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
@@ -288,11 +301,11 @@ public class JobGraphBuilder {
componentClasses.put(componentName, componentClass);
setParallelism(componentName, parallelism);
- mutability.put(componentName, false);
invokableObjects.put(componentName, invokableObject);
operatorNames.put(componentName, operatorName);
serializedFunctions.put(componentName, serializedFunction);
outEdgeList.put(componentName, new ArrayList<String>());
+ outEdgeType.put(componentName, new ArrayList<Integer>());
inEdgeList.put(componentName, new ArrayList<String>());
connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
iterationTailCount.put(componentName, 0);
@@ -321,7 +334,8 @@ public class JobGraphBuilder {
if (componentClass.equals(StreamSource.class)
|| componentClass.equals(StreamIterationSource.class)) {
component = new JobInputVertex(componentName, this.jobGraph);
- } else if (componentClass.equals(StreamTask.class)) {
+ } else if (componentClass.equals(StreamTask.class)
+ || componentClass.equals(CoStreamTask.class)) {
component = new JobTaskVertex(componentName, this.jobGraph);
} else if (componentClass.equals(StreamSink.class)
|| componentClass.equals(StreamIterationSink.class)) {
@@ -334,9 +348,7 @@ public class JobGraphBuilder {
LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
}
- Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
-
- config.setBoolean("isMutable", mutability.get(componentName));
+ Configuration config = component.getConfiguration();
// Set vertex config
if (invokableObject != null) {
@@ -428,10 +440,6 @@ public class JobGraphBuilder {
componentParallelism.put(componentName, parallelism);
}
- public void setMutability(String componentName, boolean isMutable) {
- mutability.put(componentName, isMutable);
- }
-
/**
* Connects two vertices in the JobGraph using the selected partitioner
* settings
@@ -442,10 +450,13 @@ public class JobGraphBuilder {
* Name of the downstream(input) vertex
* @param partitionerObject
* Partitioner object
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public void setEdge(String upStreamComponentName, String downStreamComponentName,
- StreamPartitioner<? extends Tuple> partitionerObject) {
+ StreamPartitioner<? extends Tuple> partitionerObject, int typeNumber) {
outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
+ outEdgeType.get(upStreamComponentName).add(typeNumber);
inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
connectionTypes.get(upStreamComponentName).add(partitionerObject);
}
@@ -463,10 +474,13 @@ public class JobGraphBuilder {
* @param downStreamComponentName
* Name of the downstream component, that will receive the
* records
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void broadcastConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName) {
- setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>());
+ String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+ setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>(),
+ typeNumber);
}
/**
@@ -485,12 +499,15 @@ public class JobGraphBuilder {
* records
* @param keyPosition
* Position of key in the tuple
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void fieldsConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int keyPosition) {
+ String upStreamComponentName, String downStreamComponentName, int keyPosition,
+ int typeNumber) {
setEdge(upStreamComponentName, downStreamComponentName, new FieldsPartitioner<T>(
- keyPosition));
+ keyPosition), typeNumber);
}
/**
@@ -505,10 +522,13 @@ public class JobGraphBuilder {
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void globalConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName) {
- setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>());
+ String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+ setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>(),
+ typeNumber);
}
/**
@@ -523,10 +543,13 @@ public class JobGraphBuilder {
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void shuffleConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName) {
- setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>());
+ String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+ setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>(),
+ typeNumber);
}
/**
@@ -542,10 +565,13 @@ public class JobGraphBuilder {
* Name of the upstream component, that will emit the tuples
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
public <T extends Tuple> void forwardConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName) {
- setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>());
+ String upStreamComponentName, String downStreamComponentName, int typeNumber) {
+ setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
+ typeNumber);
}
/**
@@ -565,8 +591,7 @@ public class JobGraphBuilder {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
- Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
- .getConfiguration();
+ Configuration config = upStreamComponent.getConfiguration();
try {
if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
@@ -729,15 +754,21 @@ public class JobGraphBuilder {
for (String componentName : outEdgeList.keySet()) {
createVertex(componentName);
}
-
+ int inputNumber = 0;
for (String upStreamComponentName : outEdgeList.keySet()) {
+
int i = 0;
+ ArrayList<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
+
for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
+ Configuration downStreamComponentConfig = components.get(downStreamComponentName)
+ .getConfiguration();
+ downStreamComponentConfig.setInteger("inputType_" + inputNumber++, outEdgeTypeList.get(i));
+
connect(upStreamComponentName, downStreamComponentName,
connectionTypes.get(upStreamComponentName).get(i));
i++;
}
-
}
setAutomaticInstanceSharing();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index c1eca4a..9b1a16a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -310,7 +311,7 @@ public abstract class StreamExecutionEnvironment {
throw new RuntimeException("Cannot serialize user defined function");
}
- connectGraph(inputStream, returnStream.getId());
+ connectGraph(inputStream, returnStream.getId(), 0);
if (inputStream.iterationflag) {
returnStream.addIterationSource(inputStream.iterationID.toString());
@@ -319,6 +320,30 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
+
+ protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2, final AbstractFunction function,
+ CoInvokable<T1, T2, R> functionInvokable) {
+
+ DataStream<R> returnStream = new DataStream<R>(this, functionName);
+
+ try {
+ jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, functionName,
+ SerializationUtils.serialize(function), degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream1, returnStream.getId(), 1);
+ connectGraph(inputStream2, returnStream.getId(), 2);
+
+ // TODO consider iteration
+// if (inputStream.iterationflag) {
+// returnStream.addIterationSource(inputStream.iterationID.toString());
+// inputStream.iterationflag = false;
+// }
+
+ return returnStream;
+ }
protected <T extends Tuple, R extends Tuple> void addIterationSource(DataStream<T> inputStream,
String iterationID) {
@@ -340,7 +365,7 @@ public abstract class StreamExecutionEnvironment {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
String input = inputStream.connectIDs.get(i);
- jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId());
+ jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId(), 0);
}
}
@@ -368,7 +393,7 @@ public abstract class StreamExecutionEnvironment {
throw new RuntimeException("Cannot serialize SinkFunction");
}
- connectGraph(inputStream, returnStream.getId());
+ connectGraph(inputStream, returnStream.getId(), 0);
return returnStream;
}
@@ -523,8 +548,10 @@ public abstract class StreamExecutionEnvironment {
* ID of the output
* @param <T>
* type of the input stream
+ * @param typeNumber
+ * Number of the type (used at co-functions)
*/
- private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
+ private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
ConnectionType type = inputStream.ctypes.get(i);
@@ -533,16 +560,16 @@ public abstract class StreamExecutionEnvironment {
switch (type) {
case SHUFFLE:
- jobGraphBuilder.shuffleConnect(inputStream, input, outputID);
+ jobGraphBuilder.shuffleConnect(inputStream, input, outputID, typeNumber);
break;
case BROADCAST:
- jobGraphBuilder.broadcastConnect(inputStream, input, outputID);
+ jobGraphBuilder.broadcastConnect(inputStream, input, outputID, typeNumber);
break;
case FIELD:
- jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param);
+ jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param, typeNumber);
break;
case FORWARD:
- jobGraphBuilder.forwardConnect(inputStream, input, outputID);
+ jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
new file mode 100644
index 0000000..5885cbf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public abstract class CoMapFunction<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends AbstractFunction {
+ private static final long serialVersionUID = 1L;
+
+ public abstract OUT map1(IN1 value);
+ public abstract OUT map2(IN2 value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
new file mode 100644
index 0000000..733b61e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+ StreamComponentInvokable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Collector<OUT> collector;
+ protected MutableObjectIterator<StreamRecord<IN1>> recordIterator1;
+ protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
+ protected StreamRecord<IN1> reuse1;
+ protected StreamRecord<IN2> reuse2;
+
+ public void initialize(Collector<OUT> collector,
+ MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
+ StreamRecordSerializer<IN1> serializer1,
+ MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
+ StreamRecordSerializer<IN2> serializer2) {
+ this.collector = collector;
+
+ this.recordIterator1 = recordIterator1;
+ this.reuse1 = serializer1.createInstance();
+
+ this.recordIterator2 = recordIterator2;
+ this.reuse2 = serializer2.createInstance();
+ }
+
+ public abstract void invoke() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
new file mode 100644
index 0000000..4a30425
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+
+public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+ CoInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private CoMapFunction<IN1, IN2, OUT> mapper;
+
+ public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
+ this.mapper = mapper;
+ }
+
+ // TODO rework this as UnionRecordReader
+ @Override
+ public void invoke() throws Exception {
+ boolean noMoreRecordOnInput1;
+ boolean noMoreRecordOnInput2;
+
+ do {
+ noMoreRecordOnInput1 = recordIterator1.next(reuse1) == null;
+ if (!noMoreRecordOnInput1) {
+ collector.collect(mapper.map1(reuse1.getTuple()));
+ }
+
+ noMoreRecordOnInput2 = recordIterator2.next(reuse2) == null;
+ if (!noMoreRecordOnInput2) {
+ collector.collect(mapper.map2(reuse2.getTuple()));
+ }
+ } while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index f90caf8..1a51492 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -19,28 +19,18 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.util.ReaderIterator;
@@ -49,9 +39,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
@@ -59,14 +47,10 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
- AbstractInvokable {
+public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
+
private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
- protected TupleTypeInfo<IN> inTupleTypeInfo = null;
- protected StreamRecordSerializer<IN> inTupleSerializer = null;
- protected DeserializationDelegate<StreamRecord<IN>> inDeserializationDelegate = null;
-
protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
protected StreamRecordSerializer<OUT> outTupleSerializer = null;
protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
@@ -105,109 +89,17 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
}
return collector;
}
-
- protected void setSerializers() {
- byte[] operatorBytes = configuration.getBytes("operator", null);
- String operatorName = configuration.getString("operatorName", "");
-
- Object function = null;
- try {
- ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
- function = in.readObject();
-
- if (operatorName.equals("flatMap")) {
- setSerializerDeserializer(function, FlatMapFunction.class);
- } else if (operatorName.equals("map")) {
- setSerializerDeserializer(function, MapFunction.class);
- } else if (operatorName.equals("batchReduce")) {
- setSerializerDeserializer(function, GroupReduceFunction.class);
- } else if (operatorName.equals("filter")) {
- setDeserializer(function, FilterFunction.class);
- setSerializer(function, FilterFunction.class, 0);
- } else if (operatorName.equals("sink")) {
- setDeserializer(function, SinkFunction.class);
- } else if (operatorName.equals("source")) {
- setSerializer(function, UserSourceInvokable.class, 0);
- } else if (operatorName.equals("elements")) {
- outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
- outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
- outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
- outTupleSerializer);
- } else {
- throw new Exception("Wrong operator name!");
- }
-
- } catch (Exception e) {
- throw new StreamComponentException(e);
- // throw new StreamComponentException("Nonsupported object (named "
- // + operatorName
- // + ") passed as operator");
- }
- }
-
- private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
- setDeserializer(function, clazz);
- setSerializer(function, clazz, 1);
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
- inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
- 0, null, null);
-
- inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
- inDeserializationDelegate = new DeserializationDelegate<StreamRecord<IN>>(inTupleSerializer);
- }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
- private void setSerializer(Object function, Class<?> clazz, int typeParameter) {
+ protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
typeParameter, null, null);
outTupleSerializer = new StreamRecordSerializer(outTupleTypeInfo.createSerializer());
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
}
-
- @SuppressWarnings("unchecked")
- protected void setSinkSerializer() {
- if (outSerializationDelegate != null) {
- inTupleTypeInfo = (TupleTypeInfo<IN>) outTupleTypeInfo;
-
- inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
- inDeserializationDelegate = new DeserializationDelegate<StreamRecord<IN>>(inTupleSerializer);
- }
- }
-
- @SuppressWarnings("unchecked")
- protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
- int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
-
- if (numberOfInputs < 2) {
-
- return new MutableRecordReader<IOReadableWritable>(this);
-
- } else {
- MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
- for (int i = 0; i < numberOfInputs; i++) {
- recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
- }
- return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
- }
- }
-
- protected MutableObjectIterator<StreamRecord<IN>> createInputIterator(MutableReader<?> inputReader,
- TypeSerializer<?> serializer) {
-
- // generic data type serialization
- @SuppressWarnings("unchecked")
- MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(reader, serializer);
- return iter;
-
- }
+
protected void setConfigOutputs(List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
@@ -217,7 +109,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
setPartitioner(i, outputs);
}
}
-
+
private void setPartitioner(int outputNumber,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
@@ -242,12 +134,12 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
+ " with " + outputNumber + " outputs");
}
} catch (Exception e) {
- throw new StreamComponentException("Cannot deserialize partitioner "
- + outputPartitioner.getClass().getSimpleName() + " with " + outputNumber
- + " outputs");
+ throw new StreamComponentException("Cannot deserialize " + outputPartitioner.getClass().getSimpleName() + " of " +
+ name + " with " + outputNumber
+ + " outputs", e);
}
}
-
+
/**
* Reads and creates a StreamComponent from the config.
*
@@ -274,12 +166,24 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
return userFunction;
}
+
+ protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(MutableReader<?> inputReader,
+ TypeSerializer<?> serializer) {
+ // generic data type serialization
+ @SuppressWarnings("unchecked")
+ MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(reader, serializer);
+ return iter;
+ }
+
@SuppressWarnings("unchecked")
- private static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+ protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
ClassNotFoundException {
return (T) SerializationUtils.deserialize(serializedObject);
}
-
+
protected abstract void setInvokable();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
new file mode 100644
index 0000000..4c1cf42
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.streamcomponent;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+ AbstractStreamComponent<OUT> {
+ private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
+
+ protected StreamRecordSerializer<IN1> inTupleSerializer1 = null;
+ protected StreamRecordSerializer<IN2> inTupleSerializer2 = null;
+
+ private MutableReader<IOReadableWritable> inputs1;
+ private MutableReader<IOReadableWritable> inputs2;
+ MutableObjectIterator<StreamRecord<IN1>> inputIter1;
+ MutableObjectIterator<StreamRecord<IN2>> inputIter2;
+
+ private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+ private CoInvokable<IN1, IN2, OUT> userFunction;
+ private int[] numberOfOutputChannels;
+ private static int numTasks;
+
+ public CoStreamTask() {
+
+ outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ userFunction = null;
+ numTasks = newComponent();
+ instanceID = numTasks;
+ }
+
+ protected void setSerializers() {
+ byte[] operatorBytes = configuration.getBytes("operator", null);
+ String operatorName = configuration.getString("operatorName", "");
+
+ Object function = null;
+ try {
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
+ function = in.readObject();
+
+ if (operatorName.equals("coMap")) {
+ setSerializer(function, CoMapFunction.class, 2);
+ setDeserializers(function, CoMapFunction.class);
+ } else {
+ throw new Exception("Wrong operator name!");
+ }
+
+ } catch (Exception e) {
+ throw new StreamComponentException(e);
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
+
+ TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
+ function.getClass(), 0, null, null);
+ inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+
+ inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+ 1, null, null);
+ inTupleSerializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+ }
+
+ @Override
+ public void registerInputOutput() {
+ initialize();
+
+ setSerializers();
+ setCollector();
+
+ // inputs1 = setConfigInputs();
+ setConfigInputs();
+
+ inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
+
+ // inputs2 = setConfigInputs();
+ inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
+
+ setConfigOutputs(outputs);
+
+ numberOfOutputChannels = new int[outputs.size()];
+ for (int i = 0; i < numberOfOutputChannels.length; i++) {
+ numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+ }
+
+ setInvokable();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected void setInvokable() {
+ // Default value is a CoMapInvokable
+ Class<? extends CoInvokable> userFunctionClass = configuration.getClass("userfunction",
+ CoMapInvokable.class, CoInvokable.class);
+ userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
+ userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
+ inTupleSerializer2);
+ }
+
+ protected void setConfigInputs() throws StreamComponentException {
+ int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+
+ ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
+ ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = configuration.getInteger("inputType_" + i, 0);
+ switch (inputType) {
+ case 1:
+ inputList1.add(new MutableRecordReader<IOReadableWritable>(this));
+ break;
+ case 2:
+ inputList2.add(new MutableRecordReader<IOReadableWritable>(this));
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
+ }
+ }
+
+ inputs1 = getInputs(inputList1);
+ inputs2 = getInputs(inputList2);
+ }
+
+ @SuppressWarnings("unchecked")
+ private MutableReader<IOReadableWritable> getInputs(
+ ArrayList<MutableRecordReader<IOReadableWritable>> inputList) {
+ if (inputList.size() == 1) {
+ return inputList.get(0);
+ } else if (inputList.size() > 1) {
+ return new MutableUnionRecordReader<IOReadableWritable>(
+ (MutableRecordReader<IOReadableWritable>[]) inputList.toArray());
+ }
+ return null;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
+ }
+
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ output.initializeSerializers();
+ }
+
+ userFunction.invoke();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
+ }
+
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ output.flush();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
new file mode 100644
index 0000000..b49620f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.streamcomponent;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+
+import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+
+public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
+ AbstractStreamComponent<OUT> {
+
+ protected StreamRecordSerializer<IN> inTupleSerializer = null;
+
+ protected void setSerializers() {
+ byte[] operatorBytes = configuration.getBytes("operator", null);
+ String operatorName = configuration.getString("operatorName", "");
+
+ Object function = null;
+ try {
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
+ function = in.readObject();
+
+ if (operatorName.equals("flatMap")) {
+ setSerializerDeserializer(function, FlatMapFunction.class);
+ } else if (operatorName.equals("map")) {
+ setSerializerDeserializer(function, MapFunction.class);
+ } else if (operatorName.equals("batchReduce")) {
+ setSerializerDeserializer(function, GroupReduceFunction.class);
+ } else if (operatorName.equals("filter")) {
+ setDeserializer(function, FilterFunction.class);
+ setSerializer(function, FilterFunction.class, 0);
+ } else if (operatorName.equals("sink")) {
+ setDeserializer(function, SinkFunction.class);
+ } else if (operatorName.equals("source")) {
+ setSerializer(function, UserSourceInvokable.class, 0);
+ } else if (operatorName.equals("coMap")) {
+ setSerializer(function, CoMapFunction.class, 2);
+ //setDeserializers(function, CoMapFunction.class);
+ } else if (operatorName.equals("elements")) {
+ outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
+
+ outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
+ outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
+ outTupleSerializer);
+ } else {
+ throw new Exception("Wrong operator name: " + operatorName);
+ }
+
+ } catch (Exception e) {
+ throw new StreamComponentException(e);
+ }
+ }
+
+ private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
+ setDeserializer(function, clazz);
+ setSerializer(function, clazz, 1);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
+ TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
+ 0, null, null);
+
+ inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void setSinkSerializer() {
+ if (outSerializationDelegate != null) {
+ TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) outTupleTypeInfo;
+
+ inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
+ int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+
+ if (numberOfInputs < 2) {
+
+ return new MutableRecordReader<IOReadableWritable>(this);
+
+ } else {
+ MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
+ }
+ return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index b92e031..1b25285 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
-public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN> {
+public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 9f58842..f880470 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 1433ebc..5e3457c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
-public class StreamSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN> {
+public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
private static final Log LOG = LogFactory.getLog(StreamSink.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 108916a..12c9ba3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent<Tuple, OUT> {
+public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
private static final Log LOG = LogFactory.getLog(StreamSource.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 7da19aa..5032446 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
- AbstractStreamComponent<IN, OUT> {
+ SingleInputAbstractStreamComponent<IN, OUT> {
private static final Log LOG = LogFactory.getLog(StreamTask.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f436690f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
new file mode 100644
index 0000000..00ae3b9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoMapTest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static Set<String> result = new HashSet<String>();
+ private static Set<String> expected = new HashSet<String>();
+
+ @Test
+ public void test() {
+ expected.add("a");
+ expected.add("b");
+ expected.add("c");
+ expected.add("1");
+ expected.add("2");
+ expected.add("3");
+ expected.add("4");
+
+ LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ DataStream<Tuple1<Integer>> ds1 = env.fromElements(1, 2, 3, 4);
+
+ @SuppressWarnings("unused")
+ DataStream<Tuple1<Boolean>> ds2 = env.fromElements("a", "b", "c").coMapWith(new CoMapFunction<Tuple1<String>, Tuple1<Integer>, Tuple1<Boolean>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple1<Boolean> map1(Tuple1<String> value) {
+ System.out.println("1: " + value);
+ result.add(value.f0);
+ return new Tuple1<Boolean>(true);
+ }
+
+ @Override
+ public Tuple1<Boolean> map2(Tuple1<Integer> value) {
+ System.out.println("2: " +value);
+ result.add(value.f0.toString());
+ return new Tuple1<Boolean>(false);
+ }
+ }, ds1)
+ .print();
+
+ env.executeTest(32);
+ Assert.assertArrayEquals(expected.toArray(), result.toArray());
+ }
+}