You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:40 UTC
[07/12] git commit: [streaming] Streaming jobgraph and vertex
refactor to match recent runtime changes
[streaming] Streaming jobgraph and vertex refactor to match recent runtime changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/73371101
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/73371101
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/73371101
Branch: refs/heads/master
Commit: 73371101e9edd16f2823ce8af0b27283e3ed3264
Parents: 7cc2400
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Sep 21 22:58:18 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 519 ++++++++-----------
.../flink/streaming/api/StreamConfig.java | 25 +-
.../api/datastream/BatchedDataStream.java | 4 +-
.../streaming/api/datastream/DataStream.java | 30 +-
.../api/datastream/IterativeDataStream.java | 2 +-
.../environment/StreamExecutionEnvironment.java | 16 +-
.../streaming/api/invokable/SinkInvokable.java | 2 +-
.../api/invokable/SourceInvokable.java | 14 +-
.../api/invokable/StreamInvokable.java | 100 +++-
.../api/invokable/StreamOperatorInvokable.java | 119 -----
.../operator/BatchGroupReduceInvokable.java | 4 +-
.../operator/BatchReduceInvokable.java | 4 +-
.../invokable/operator/CounterInvokable.java | 4 +-
.../api/invokable/operator/FilterInvokable.java | 4 +-
.../invokable/operator/FlatMapInvokable.java | 4 +-
.../api/invokable/operator/MapInvokable.java | 4 +-
.../operator/StreamReduceInvokable.java | 4 +-
.../api/invokable/operator/co/CoInvokable.java | 17 +-
.../AbstractStreamComponent.java | 71 ---
.../api/streamcomponent/CoStreamTask.java | 114 ----
.../api/streamcomponent/InputHandler.java | 104 ----
.../api/streamcomponent/OutputHandler.java | 179 -------
.../StreamComponentException.java | 68 ---
.../streamcomponent/StreamIterationSink.java | 104 ----
.../streamcomponent/StreamIterationSource.java | 100 ----
.../api/streamcomponent/StreamSink.java | 61 ---
.../api/streamcomponent/StreamSource.java | 52 --
.../api/streamcomponent/StreamTask.java | 55 --
.../api/streamvertex/CoStreamVertex.java | 113 ++++
.../api/streamvertex/InputHandler.java | 109 ++++
.../api/streamvertex/OutputHandler.java | 181 +++++++
.../api/streamvertex/StreamIterationHead.java | 100 ++++
.../api/streamvertex/StreamIterationTail.java | 104 ++++
.../api/streamvertex/StreamVertex.java | 96 ++++
.../api/streamvertex/StreamVertexException.java | 68 +++
.../api/collector/StreamCollectorTest.java | 6 +-
.../api/streamcomponent/MockRecordWriter.java | 45 --
.../streamcomponent/StreamComponentTest.java | 197 -------
.../api/streamvertex/MockRecordWriter.java | 45 ++
.../api/streamvertex/StreamVertexTest.java | 184 +++++++
.../flink/streaming/util/MockInvokable.java | 4 +-
.../streaming/util/MockRecordWriterFactory.java | 2 +-
.../examples/window/join/WindowJoinLocal.java | 4 +-
43 files changed, 1399 insertions(+), 1643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d8d4f2d..a04dbaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -19,32 +19,29 @@ package org.apache.flink.streaming.api;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
-import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
-import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
-import org.apache.flink.streaming.api.streamcomponent.StreamSink;
-import org.apache.flink.streaming.api.streamcomponent.StreamSource;
-import org.apache.flink.streaming.api.streamcomponent.StreamTask;
+import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
+import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Object for building Apache Flink stream processing job graphs
@@ -55,8 +52,8 @@ public class JobGraphBuilder {
private final JobGraph jobGraph;
// Graph attributes
- private Map<String, AbstractJobVertex> components;
- private Map<String, Integer> componentParallelism;
+ private Map<String, AbstractJobVertex> streamVertices;
+ private Map<String, Integer> vertexParallelism;
private Map<String, Long> bufferTimeout;
private Map<String, List<String>> outEdgeList;
private Map<String, List<Integer>> outEdgeType;
@@ -66,17 +63,17 @@ public class JobGraphBuilder {
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<?>>> connectionTypes;
private Map<String, String> operatorNames;
- private Map<String, StreamInvokable<?>> invokableObjects;
+ private Map<String, StreamInvokable<?, ?>> invokableObjects;
private Map<String, TypeSerializerWrapper<?>> typeWrapperIn1;
private Map<String, TypeSerializerWrapper<?>> typeWrapperIn2;
private Map<String, TypeSerializerWrapper<?>> typeWrapperOut1;
private Map<String, TypeSerializerWrapper<?>> typeWrapperOut2;
private Map<String, byte[]> serializedFunctions;
private Map<String, byte[]> outputSelectors;
- private Map<String, Class<? extends AbstractInvokable>> componentClasses;
+ private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
private Map<String, String> iterationIds;
- private Map<String, String> iterationIDtoSourceName;
- private Map<String, String> iterationIDtoSinkName;
+ private Map<String, String> iterationIDtoHeadName;
+ private Map<String, String> iterationIDtoTailName;
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
@@ -86,7 +83,6 @@ public class JobGraphBuilder {
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
* and consists of sources, tasks (intermediate vertices) and sinks. A
- * JobGraph must contain at least a source and a sink.
*
* @param jobGraphName
* Name of the JobGraph
@@ -95,8 +91,8 @@ public class JobGraphBuilder {
jobGraph = new JobGraph(jobGraphName);
- components = new HashMap<String, AbstractJobVertex>();
- componentParallelism = new HashMap<String, Integer>();
+ streamVertices = new HashMap<String, AbstractJobVertex>();
+ vertexParallelism = new HashMap<String, Integer>();
bufferTimeout = new HashMap<String, Long>();
outEdgeList = new HashMap<String, List<String>>();
outEdgeType = new HashMap<String, List<Integer>>();
@@ -106,17 +102,17 @@ public class JobGraphBuilder {
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
operatorNames = new HashMap<String, String>();
- invokableObjects = new HashMap<String, StreamInvokable<?>>();
+ invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
typeWrapperIn1 = new HashMap<String, TypeSerializerWrapper<?>>();
typeWrapperIn2 = new HashMap<String, TypeSerializerWrapper<?>>();
typeWrapperOut1 = new HashMap<String, TypeSerializerWrapper<?>>();
typeWrapperOut2 = new HashMap<String, TypeSerializerWrapper<?>>();
serializedFunctions = new HashMap<String, byte[]>();
outputSelectors = new HashMap<String, byte[]>();
- componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
+ vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
iterationIds = new HashMap<String, String>();
- iterationIDtoSourceName = new HashMap<String, String>();
- iterationIDtoSinkName = new HashMap<String, String>();
+ iterationIDtoHeadName = new HashMap<String, String>();
+ iterationIDtoTailName = new HashMap<String, String>();
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
@@ -142,12 +138,16 @@ public class JobGraphBuilder {
}
/**
- * Adds source to the JobGraph with the given parameters
+ * Adds a vertex to the streaming JobGraph with the given parameters
*
- * @param componentName
- * Name of the component
- * @param InvokableObject
+ * @param vertexName
+ * Name of the vertex
+ * @param invokableObject
* User defined operator
+ * @param inTypeWrapper
+ * Input type wrapper for serialization
+ * @param outTypeWrapper
+ * Output type wrapper for serialization
* @param operatorName
* Operator type
* @param serializedFunction
@@ -155,25 +155,27 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
- public void addSource(String componentName, SourceInvokable<?> InvokableObject,
+ public <IN, OUT> void addStreamVertex(String vertexName,
+ StreamInvokable<IN, OUT> invokableObject, TypeSerializerWrapper<?> inTypeWrapper,
TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
- addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
+ addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
serializedFunction, parallelism);
- addTypeWrappers(componentName, null, null, outTypeWrapper, null);
+
+ addTypeWrappers(vertexName, inTypeWrapper, null, outTypeWrapper, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE: {}", componentName);
+ LOG.debug("Vertex: {}", vertexName);
}
}
/**
- * Adds a source to the iteration head to the {@link JobGraph}. The iterated
- * tuples will be fed from this component back to the graph.
+ * Adds a vertex for the iteration head to the {@link JobGraph}. The
+ * iterated values will be fed from this vertex back to the graph.
*
- * @param componentName
- * Name of the component
+ * @param vertexName
+ * Name of the vertex
* @param iterationHead
* Id of the iteration head
* @param iterationID
@@ -183,143 +185,82 @@ public class JobGraphBuilder {
* @param waitTime
* Max wait time for next record
*/
- public void addIterationSource(String componentName, String iterationHead, String iterationID,
+ public void addIterationHead(String vertexName, String iterationHead, String iterationID,
int parallelism, long waitTime) {
- addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
+ addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
- iterationIds.put(componentName, iterationID);
- iterationIDtoSourceName.put(iterationID, componentName);
+ iterationIds.put(vertexName, iterationID);
+ iterationIDtoHeadName.put(iterationID, vertexName);
- setBytesFrom(iterationHead, componentName);
+ setBytesFrom(iterationHead, vertexName);
- setEdge(componentName, iterationHead,
- connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0,
- new ArrayList<String>(), false);
+ setEdge(vertexName, iterationHead, connectionTypes
+ .get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(),
+ false);
- iterationWaitTime.put(iterationIDtoSourceName.get(iterationID), waitTime);
+ iterationWaitTime.put(iterationIDtoHeadName.get(iterationID), waitTime);
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SOURCE: {}", componentName);
+ LOG.debug("ITERATION SOURCE: {}", vertexName);
}
}
/**
- * Adds a task to the JobGraph with the given parameters
+ * Adds a vertex for the iteration tail to the {@link JobGraph}. The values
+ * intended to be iterated will be sent to this sink from the iteration
+ * head.
*
- * @param componentName
- * Name of the component
- * @param taskInvokableObject
- * User defined operator
- * @param inTypeWrapper
- * Input type wrapper for serialization
- * @param outTypeWrapper
- * Output type wrapper for serialization
- * @param operatorName
- * Operator type
- * @param serializedFunction
- * Serialized udf
+ * @param vertexName
+ * Name of the vertex
+ * @param iterationTail
+ * Id of the iteration tail
+ * @param iterationID
+ * ID of iteration for mulitple iterations
* @param parallelism
* Number of parallel instances created
+ * @param waitTime
+ * Max waiting time for next record
*/
- public <IN, OUT> void addTask(String componentName,
- StreamOperatorInvokable<IN, OUT> taskInvokableObject,
- TypeSerializerWrapper<?> inTypeWrapper, TypeSerializerWrapper<?> outTypeWrapper,
- String operatorName, byte[] serializedFunction, int parallelism) {
+ public void addIterationTail(String vertexName, String iterationTail, String iterationID,
+ int parallelism, long waitTime) {
- addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
- serializedFunction, parallelism);
+ addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism);
+
+ iterationIds.put(vertexName, iterationID);
+ iterationIDtoTailName.put(iterationID, vertexName);
- addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper, null);
+ setBytesFrom(iterationTail, vertexName);
+ iterationWaitTime.put(iterationIDtoTailName.get(iterationID), waitTime);
if (LOG.isDebugEnabled()) {
- LOG.debug("TASK: {}", componentName);
+ LOG.debug("ITERATION SINK: {}", vertexName);
}
+
}
- public <IN1, IN2, OUT> void addCoTask(String componentName,
+ public <IN1, IN2, OUT> void addCoTask(String vertexName,
CoInvokable<IN1, IN2, OUT> taskInvokableObject,
TypeSerializerWrapper<?> in1TypeWrapper, TypeSerializerWrapper<?> in2TypeWrapper,
TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
- addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
- serializedFunction, parallelism);
-
- addTypeWrappers(componentName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CO-TASK: {}", componentName);
- }
- }
-
- /**
- * Adds sink to the JobGraph with the given parameters
- *
- * @param componentName
- * Name of the component
- * @param InvokableObject
- * User defined operator
- * @param operatorName
- * Operator type
- * @param serializedFunction
- * Serialized udf
- * @param parallelism
- * Number of parallel instances created
- */
- public void addSink(String componentName, SinkInvokable<?> InvokableObject,
- TypeSerializerWrapper<?> inTypeWrapper, String operatorName, byte[] serializedFunction,
- int parallelism) {
-
- addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
+ addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
serializedFunction, parallelism);
- addTypeWrappers(componentName, inTypeWrapper, null, null, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SINK: {}", componentName);
- }
-
- }
-
- /**
- * Adds a sink to an iteration tail to the {@link JobGraph}. The tuples
- * intended to be iterated will be sent to this sink from the iteration
- * head.
- *
- * @param componentName
- * Name of the component
- * @param iterationTail
- * Id of the iteration tail
- * @param iterationID
- * ID of iteration for mulitple iterations
- * @param parallelism
- * Number of parallel instances created
- * @param waitTime
- * Max waiting time for next record
- */
- public void addIterationSink(String componentName, String iterationTail, String iterationID,
- int parallelism, long waitTime) {
-
- addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
-
- iterationIds.put(componentName, iterationID);
- iterationIDtoSinkName.put(iterationID, componentName);
- setBytesFrom(iterationTail, componentName);
- iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
+ addTypeWrappers(vertexName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SINK: {}", componentName);
+ LOG.debug("CO-TASK: {}", vertexName);
}
-
}
/**
- * Sets component parameters in the JobGraph
+ * Sets vertex parameters in the JobGraph
*
- * @param componentName
- * Name of the component
- * @param componentClass
+ * @param vertexName
+ * Name of the vertex
+ * @param vertexClass
* The class of the vertex
* @param invokableObject
* The user defined invokable object
@@ -330,114 +271,153 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
- private void addComponent(String componentName,
- Class<? extends AbstractInvokable> componentClass, StreamInvokable<?> invokableObject,
- String operatorName, byte[] serializedFunction, int parallelism) {
-
- componentClasses.put(componentName, componentClass);
- setParallelism(componentName, parallelism);
- mutability.put(componentName, false);
- invokableObjects.put(componentName, invokableObject);
- operatorNames.put(componentName, operatorName);
- serializedFunctions.put(componentName, serializedFunction);
- outEdgeList.put(componentName, new ArrayList<String>());
- outEdgeType.put(componentName, new ArrayList<Integer>());
- outEdgeNames.put(componentName, new ArrayList<List<String>>());
- outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
- inEdgeList.put(componentName, new ArrayList<String>());
- connectionTypes.put(componentName, new ArrayList<StreamPartitioner<?>>());
- iterationTailCount.put(componentName, 0);
+ private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass,
+ StreamInvokable<?, ?> invokableObject, String operatorName, byte[] serializedFunction,
+ int parallelism) {
+
+ vertexClasses.put(vertexName, vertexClass);
+ setParallelism(vertexName, parallelism);
+ mutability.put(vertexName, false);
+ invokableObjects.put(vertexName, invokableObject);
+ operatorNames.put(vertexName, operatorName);
+ serializedFunctions.put(vertexName, serializedFunction);
+ outEdgeList.put(vertexName, new ArrayList<String>());
+ outEdgeType.put(vertexName, new ArrayList<Integer>());
+ outEdgeNames.put(vertexName, new ArrayList<List<String>>());
+ outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>());
+ inEdgeList.put(vertexName, new ArrayList<String>());
+ connectionTypes.put(vertexName, new ArrayList<StreamPartitioner<?>>());
+ iterationTailCount.put(vertexName, 0);
}
- private void addTypeWrappers(String componentName, TypeSerializerWrapper<?> in1,
+ private void addTypeWrappers(String vertexName, TypeSerializerWrapper<?> in1,
TypeSerializerWrapper<?> in2, TypeSerializerWrapper<?> out1,
TypeSerializerWrapper<?> out2) {
- typeWrapperIn1.put(componentName, in1);
- typeWrapperIn2.put(componentName, in2);
- typeWrapperOut1.put(componentName, out1);
- typeWrapperOut2.put(componentName, out2);
+ typeWrapperIn1.put(vertexName, in1);
+ typeWrapperIn2.put(vertexName, in2);
+ typeWrapperOut1.put(vertexName, out1);
+ typeWrapperOut2.put(vertexName, out2);
}
/**
* Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its
* config parameters using the ones set previously.
*
- * @param componentName
- * Name of the component for which the vertex will be created.
+ * @param vertexName
+ * Name for which the vertex will be created.
*/
- private void createVertex(String componentName) {
+ private void createVertex(String vertexName) {
// Get vertex attributes
- Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
- StreamInvokable<?> invokableObject = invokableObjects.get(componentName);
- String operatorName = operatorNames.get(componentName);
- byte[] serializedFunction = serializedFunctions.get(componentName);
- int parallelism = componentParallelism.get(componentName);
- byte[] outputSelector = outputSelectors.get(componentName);
+ Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
+ StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
+ String operatorName = operatorNames.get(vertexName);
+ byte[] serializedFunction = serializedFunctions.get(vertexName);
+ int parallelism = vertexParallelism.get(vertexName);
+ byte[] outputSelector = outputSelectors.get(vertexName);
// Create vertex object
- AbstractJobVertex component = new AbstractJobVertex(componentName);
+ AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
- this.jobGraph.addVertex(component);
+ this.jobGraph.addVertex(vertex);
- component.setInvokableClass(componentClass);
- component.setParallelism(parallelism);
+ vertex.setInvokableClass(vertexClass);
+ vertex.setParallelism(parallelism);
if (LOG.isDebugEnabled()) {
- LOG.debug("Parallelism set: {} for {}", parallelism, componentName);
+ LOG.debug("Parallelism set: {} for {}", parallelism, vertexName);
}
- StreamConfig config = new StreamConfig(component.getConfiguration());
+ StreamConfig config = new StreamConfig(vertex.getConfiguration());
- config.setMutability(mutability.get(componentName));
- config.setBufferTimeout(bufferTimeout.get(componentName));
+ config.setMutability(mutability.get(vertexName));
+ config.setBufferTimeout(bufferTimeout.get(vertexName));
- config.setTypeWrapperIn1(typeWrapperIn1.get(componentName));
- config.setTypeWrapperIn2(typeWrapperIn2.get(componentName));
- config.setTypeWrapperOut1(typeWrapperOut1.get(componentName));
- config.setTypeWrapperOut2(typeWrapperOut2.get(componentName));
+ config.setTypeWrapperIn1(typeWrapperIn1.get(vertexName));
+ config.setTypeWrapperIn2(typeWrapperIn2.get(vertexName));
+ config.setTypeWrapperOut1(typeWrapperOut1.get(vertexName));
+ config.setTypeWrapperOut2(typeWrapperOut2.get(vertexName));
// Set vertex config
config.setUserInvokable(invokableObject);
- config.setComponentName(componentName);
+ config.setVertexName(vertexName);
config.setFunction(serializedFunction, operatorName);
config.setOutputSelector(outputSelector);
- if (componentClass.equals(StreamIterationSource.class)
- || componentClass.equals(StreamIterationSink.class)) {
- config.setIterationId(iterationIds.get(componentName));
- config.setIterationWaitTime(iterationWaitTime.get(componentName));
+ if (vertexClass.equals(StreamIterationHead.class)
+ || vertexClass.equals(StreamIterationTail.class)) {
+ config.setIterationId(iterationIds.get(vertexName));
+ config.setIterationWaitTime(iterationWaitTime.get(vertexName));
+ }
+
+ streamVertices.put(vertexName, vertex);
+ }
+
+ /**
+ * Connects two vertices with the given names, partitioning and channel type
+ *
+ * @param upStreamVertexName
+ * Name of the upstream vertex, that will emit the values
+ * @param downStreamVertexName
+ * Name of the downstream vertex, that will receive the values
+ * @param partitionerObject
+ * The partitioner
+ */
+ private <T> void connect(String upStreamVertexName, String downStreamVertexName,
+ StreamPartitioner<T> partitionerObject) {
+
+ AbstractJobVertex upStreamVertex = streamVertices.get(upStreamVertexName);
+ AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName);
+
+ StreamConfig config = new StreamConfig(upStreamVertex.getConfiguration());
+
+ if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
+ downStreamVertex
+ .connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
+ } else {
+ downStreamVertex
+ .connectNewDataSetAsInput(upStreamVertex, DistributionPattern.BIPARTITE);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
+ upStreamVertexName, downStreamVertexName);
}
- components.put(componentName, component);
+ int outputIndex = upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1;
+
+ config.setOutputName(outputIndex, outEdgeNames.get(upStreamVertexName).get(outputIndex));
+ config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamVertexName).get(outputIndex));
+ config.setPartitioner(outputIndex, partitionerObject);
+ config.setNumberOfOutputChannels(outputIndex, vertexParallelism.get(downStreamVertexName));
}
/**
- * Sets the number of parallel instances created for the given component.
+ * Sets the number of parallel instances created for the given vertex.
*
- * @param componentName
- * Name of the component
+ * @param vertexName
+ * Name of the vertex
* @param parallelism
* Number of parallel instances created
*/
- public void setParallelism(String componentName, int parallelism) {
- componentParallelism.put(componentName, parallelism);
+ public void setParallelism(String vertexName, int parallelism) {
+ vertexParallelism.put(vertexName, parallelism);
}
- public void setMutability(String componentName, boolean isMutable) {
- mutability.put(componentName, isMutable);
+ public void setMutability(String vertexName, boolean isMutable) {
+ mutability.put(vertexName, isMutable);
}
- public void setBufferTimeout(String componentName, long bufferTimeout) {
- this.bufferTimeout.put(componentName, bufferTimeout);
+ public void setBufferTimeout(String vertexName, long bufferTimeout) {
+ this.bufferTimeout.put(vertexName, bufferTimeout);
}
/**
* Connects two vertices in the JobGraph using the selected partitioner
* settings
*
- * @param upStreamComponentName
+ * @param upStreamVertexName
* Name of the upstream(output) vertex
- * @param downStreamComponentName
+ * @param downStreamVertexName
* Name of the downstream(input) vertex
* @param partitionerObject
* Partitioner object
@@ -446,55 +426,15 @@ public class JobGraphBuilder {
* @param outputNames
* User defined names of the out edge
*/
- public void setEdge(String upStreamComponentName, String downStreamComponentName,
+ public void setEdge(String upStreamVertexName, String downStreamVertexName,
StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames,
boolean selectAll) {
- outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
- outEdgeType.get(upStreamComponentName).add(typeNumber);
- inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
- connectionTypes.get(upStreamComponentName).add(partitionerObject);
- outEdgeNames.get(upStreamComponentName).add(outputNames);
- outEdgeSelectAll.get(upStreamComponentName).add(selectAll);
- }
-
- /**
- * Connects to JobGraph components with the given names, partitioning and
- * channel type
- *
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the tuples
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the tuples
- * @param partitionerObject
- * The partitioner
- */
- private <T> void connect(String upStreamComponentName, String downStreamComponentName,
- StreamPartitioner<T> partitionerObject) {
-
- AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
- AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
-
- StreamConfig config = new StreamConfig(upStreamComponent.getConfiguration());
-
- if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
- downStreamComponent.connectNewDataSetAsInput(upStreamComponent, DistributionPattern.POINTWISE);
- } else {
- downStreamComponent.connectNewDataSetAsInput(upStreamComponent, DistributionPattern.BIPARTITE);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
- upStreamComponentName, downStreamComponentName);
- }
-
- int outputIndex = upStreamComponent.getNumberOfProducedIntermediateDataSets() - 1;
-
- config.setOutputName(outputIndex, outEdgeNames.get(upStreamComponentName).get(outputIndex));
- config.setSelectAll(outputIndex,
- outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
- config.setPartitioner(outputIndex, partitionerObject);
- config.setNumberOfOutputChannels(outputIndex,
- componentParallelism.get(downStreamComponentName));
+ outEdgeList.get(upStreamVertexName).add(downStreamVertexName);
+ outEdgeType.get(upStreamVertexName).add(typeNumber);
+ inEdgeList.get(downStreamVertexName).add(upStreamVertexName);
+ connectionTypes.get(upStreamVertexName).add(partitionerObject);
+ outEdgeNames.get(upStreamVertexName).add(outputNames);
+ outEdgeSelectAll.get(upStreamVertexName).add(selectAll);
}
/**
@@ -507,33 +447,31 @@ public class JobGraphBuilder {
* ID of the iteration tail
*/
public void setIterationSourceSettings(String iterationID, String iterationTail) {
- setParallelism(iterationIDtoSourceName.get(iterationID),
- componentParallelism.get(iterationTail));
- setBufferTimeout(iterationIDtoSourceName.get(iterationID), bufferTimeout.get(iterationTail));
+ setParallelism(iterationIDtoHeadName.get(iterationID), vertexParallelism.get(iterationTail));
+ setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeout.get(iterationTail));
}
/**
- * Sets a user defined {@link OutputSelector} for the given component. Used
- * for directed emits.
+ * Sets a user defined {@link OutputSelector} for the given vertex. Used for
+ * directed emits.
*
- * @param componentName
- * Name of the component for which the output selector will be
- * set
+ * @param vertexName
+ * Name of the vertex for which the output selector will be set
* @param serializedOutputSelector
* Byte array representing the serialized output selector.
*/
- public <T> void setOutputSelector(String componentName, byte[] serializedOutputSelector) {
- outputSelectors.put(componentName, serializedOutputSelector);
+ public <T> void setOutputSelector(String vertexName, byte[] serializedOutputSelector) {
+ outputSelectors.put(vertexName, serializedOutputSelector);
if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for {}", componentName);
+ LOG.debug("Outputselector set for {}", vertexName);
}
}
/**
- * Sets udf operator and TypeSerializerWrapper from one component to
- * another, used with some sinks.
+ * Sets udf operator and TypeSerializerWrapper from one vertex to another,
+ * used with some sinks.
*
* @param from
* from
@@ -559,39 +497,32 @@ public class JobGraphBuilder {
return typeWrapperOut1.get(id).getTypeInfo();
}
-// TODO: This should be adjusted to the sharing groups
-// /**
-// * Sets instance sharing between the given components
-// *
-// * @param component1
-// * Share will be called on this component
-// * @param component2
-// * Share will be called to this component
-// */
-// public void setInstanceSharing(String component1, String component2) {
-// AbstractJobVertex c1 = components.get(component1);
-// AbstractJobVertex c2 = components.get(component2);
-//
-// c1.setVertexToShareInstancesWith(c2);
-// }
-
/**
- * Sets all components to share with the one with highest parallelism
+ * Sets slot sharing for the vertices.
*/
- private void setAutomaticInstanceSharing() {
+ private void setSlotSharing() {
SlotSharingGroup shareGroup = new SlotSharingGroup();
- for (AbstractJobVertex vertex : components.values()) {
+ for (AbstractJobVertex vertex : streamVertices.values()) {
vertex.setSlotSharingGroup(shareGroup);
}
+
+ for (String iterID : new HashSet<String>(iterationIds.values())) {
+ CoLocationGroup ccg = new CoLocationGroup();
+ AbstractJobVertex tail = streamVertices.get(iterationIDtoTailName.get(iterID));
+ AbstractJobVertex head = streamVertices.get(iterationIDtoHeadName.get(iterID));
+
+ ccg.addVertex(head);
+ ccg.addVertex(tail);
+ }
}
/**
* Writes number of inputs into each JobVertex's config
*/
private void setNumberOfJobInputs() {
- for (AbstractJobVertex component : components.values()) {
- (new StreamConfig(component.getConfiguration())).setNumberOfInputs(component
+ for (AbstractJobVertex vertex : streamVertices.values()) {
+ (new StreamConfig(vertex.getConfiguration())).setNumberOfInputs(vertex
.getNumberOfInputs());
}
}
@@ -601,43 +532,43 @@ public class JobGraphBuilder {
* config
*/
private void setNumberOfJobOutputs() {
- for (AbstractJobVertex component : components.values()) {
- (new StreamConfig(component.getConfiguration())).setNumberOfOutputs(component
+ for (AbstractJobVertex vertex : streamVertices.values()) {
+ (new StreamConfig(vertex.getConfiguration())).setNumberOfOutputs(vertex
.getNumberOfProducedIntermediateDataSets());
}
}
/**
- * Builds the {@link JobGraph} from the components with the edges and
- * settings provided.
+ * Builds the {@link JobGraph} from the vertices with the edges and settings
+ * provided.
*/
private void buildGraph() {
- for (String componentName : outEdgeList.keySet()) {
- createVertex(componentName);
+ for (String vertexName : outEdgeList.keySet()) {
+ createVertex(vertexName);
}
- for (String upStreamComponentName : outEdgeList.keySet()) {
+ for (String upStreamVertexName : outEdgeList.keySet()) {
int i = 0;
- List<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
+ List<Integer> outEdgeTypeList = outEdgeType.get(upStreamVertexName);
- for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
- StreamConfig downStreamComponentConfig = new StreamConfig(components.get(
- downStreamComponentName).getConfiguration());
+ for (String downStreamVertexName : outEdgeList.get(upStreamVertexName)) {
+ StreamConfig downStreamVertexConfig = new StreamConfig(streamVertices.get(
+ downStreamVertexName).getConfiguration());
- int inputNumber = downStreamComponentConfig.getNumberOfInputs();
+ int inputNumber = downStreamVertexConfig.getNumberOfInputs();
- downStreamComponentConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
- downStreamComponentConfig.setNumberOfInputs(inputNumber);
+ downStreamVertexConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
+ downStreamVertexConfig.setNumberOfInputs(inputNumber);
- connect(upStreamComponentName, downStreamComponentName,
- connectionTypes.get(upStreamComponentName).get(i));
+ connect(upStreamVertexName, downStreamVertexName,
+ connectionTypes.get(upStreamVertexName).get(i));
i++;
}
}
- setAutomaticInstanceSharing();
+ setSlotSharing();
setNumberOfJobInputs();
setNumberOfJobOutputs();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 2c53fb3..6fac391 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
+import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
@@ -46,7 +46,7 @@ public class StreamConfig {
private static final String DIRECTED_EMIT = "directedEmit";
private static final String FUNCTION_NAME = "operatorName";
private static final String FUNCTION = "operator";
- private static final String COMPONENT_NAME = "componentName";
+ private static final String VERTEX_NAME = "vertexName";
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
private static final String BUFFER_TIMEOUT = "bufferTimeout";
@@ -125,8 +125,13 @@ public class StreamConfig {
TypeSerializerWrapper<T> typeWrapper = (TypeSerializerWrapper<T>) SerializationUtils
.deserialize(serializedWrapper);
+ if (typeWrapper != null) {
+ return typeWrapper.getTypeInfo();
+
+ } else {
+ return null;
+ }
- return typeWrapper.getTypeInfo();
}
public void setMutability(boolean isMutable) {
@@ -145,7 +150,7 @@ public class StreamConfig {
return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
}
- public void setUserInvokable(StreamInvokable<?> invokableObject) {
+ public void setUserInvokable(StreamInvokable<?,?> invokableObject) {
if (invokableObject != null) {
config.setClass(USER_FUNCTION, invokableObject.getClass());
@@ -162,16 +167,16 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
- throw new StreamComponentException("Cannot instantiate user function", e);
+ throw new StreamVertexException("Cannot instantiate user function", e);
}
}
- public void setComponentName(String componentName) {
- config.setString(COMPONENT_NAME, componentName);
+ public void setVertexName(String vertexName) {
+ config.setString(VERTEX_NAME, vertexName);
}
- public String getComponentName() {
- return config.getString(COMPONENT_NAME, null);
+ public String getVertexName() {
+ return config.getString(VERTEX_NAME, null);
}
public void setFunction(byte[] serializedFunction, String functionName) {
@@ -212,7 +217,7 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
} catch (Exception e) {
- throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+ throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector",
e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 2258d4a..51f1467 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
@@ -200,7 +200,7 @@ public class BatchedDataStream<OUT> {
}
private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
- StreamOperatorInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
+ StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ace76aa..23bc80d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
@@ -558,7 +558,7 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}
-
+
/**
* Applies an aggregation that gives the count of the data point.
*
@@ -568,15 +568,16 @@ public class DataStream<OUT> {
TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
- return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
+ return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
+ new CounterInvokable<OUT>());
}
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
- SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
- outTypeWrapper, invokable);
+ SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
+ outTypeWrapper, outTypeWrapper, invokable);
return returnStream;
}
@@ -759,7 +760,8 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
+ inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -909,7 +911,8 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
+ inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -944,7 +947,7 @@ public class DataStream<OUT> {
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
- jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+ jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
return this.copy();
@@ -966,15 +969,14 @@ public class DataStream<OUT> {
*/
protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
final Function function, TypeSerializerWrapper<OUT> inTypeWrapper,
- TypeSerializerWrapper<R> outTypeWrapper,
- StreamOperatorInvokable<OUT, R> functionInvokable) {
+ TypeSerializerWrapper<R> outTypeWrapper, StreamInvokable<OUT, R> functionInvokable) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
functionName, outTypeWrapper);
try {
- jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
outTypeWrapper, functionName,
SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
} catch (SerializationException e) {
@@ -1049,13 +1051,13 @@ public class DataStream<OUT> {
}
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
- SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
+ SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> inTypeWrapper) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
outTypeWrapper);
try {
- jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
- typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+ inTypeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction),
degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SinkFunction");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index e619f36..41616c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -82,7 +82,7 @@ public class IterativeDataStream<IN> extends
public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
- jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
+ jobGraphBuilder.addIterationTail(returnStream.getId(), iterationTail.getId(),
iterationID.toString(), iterationTail.getParallelism(), waitTime);
jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 00f9082..c3231d7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -230,8 +230,8 @@ public abstract class StreamExecutionEnvironment {
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+ null, outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
@@ -267,8 +267,8 @@ public abstract class StreamExecutionEnvironment {
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(
- new FromElementsFunction<OUT>(data)), new ObjectTypeWrapper<OUT>(data
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
+ new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
.iterator().next()), "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
@@ -311,8 +311,9 @@ public abstract class StreamExecutionEnvironment {
outTypeWrapper);
try {
- jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- outTypeWrapper, "source", SerializationUtils.serialize(function), parallelism);
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+ null, outTypeWrapper, "source", SerializationUtils.serialize(function),
+ parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
@@ -454,7 +455,8 @@ public abstract class StreamExecutionEnvironment {
* <p>
* The program execution will be logged and displayed with a generated
* default name.
- * @throws Exception
+ *
+ * @throws Exception
**/
public abstract void execute() throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 4c7138c..ec33224 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public class SinkInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 30c86f9..8c9df46 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -38,4 +38,16 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serial
sourceFunction.invoke(collector);
}
+ @Override
+ protected void immutableInvoke() throws Exception {
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 4f0e138..b3cd57f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -22,18 +22,30 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The StreamInvokable represents the base class for all invokables in
- * the streaming topology.
+ * The StreamInvokable represents the base class for all invokables in the
+ * streaming topology.
*
* @param <OUT>
* The output type of the invokable
*/
-public abstract class StreamInvokable<OUT> implements Serializable {
+public abstract class StreamInvokable<IN, OUT> implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
+
+ protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+ protected StreamRecordSerializer<IN> serializer;
+ protected StreamRecord<IN> reuse;
+ protected boolean isMutable;
protected Collector<OUT> collector;
protected Function userFunction;
@@ -43,8 +55,79 @@ public abstract class StreamInvokable<OUT> implements Serializable {
this.userFunction = userFunction;
}
- public void setCollector(Collector<OUT> collector) {
+ /**
+ * Initializes the {@link StreamOperatorInvokable} for input and output
+ * handling
+ *
+ * @param collector
+ * Collector object for collecting the outputs for the operator
+ * @param recordIterator
+ * Iterator for reading in the input records
+ * @param serializer
+ * Serializer used to deserialize inputs
+ * @param isMutable
+ * Mutability setting for the operator
+ */
+ public void initialize(Collector<OUT> collector,
+ MutableObjectIterator<StreamRecord<IN>> recordIterator,
+ StreamRecordSerializer<IN> serializer, boolean isMutable) {
this.collector = collector;
+ this.recordIterator = recordIterator;
+ this.serializer = serializer;
+ if(this.serializer != null){
+ this.reuse = serializer.createInstance();
+ }
+ this.isMutable = isMutable;
+ }
+
+ /**
+ * Re-initializes the object in which the next input record will be read in
+ */
+ protected void resetReuse() {
+ this.reuse = serializer.createInstance();
+ }
+
+ /**
+ * Method that will be called if the mutability setting is set to immutable
+ */
+ protected abstract void immutableInvoke() throws Exception;
+
+ /**
+ * Method that will be called if the mutability setting is set to mutable
+ */
+ protected abstract void mutableInvoke() throws Exception;
+
+ /**
+ * The call of the user implemented function should be implemented here
+ */
+ protected abstract void callUserFunction() throws Exception;
+
+ /**
+ * Method for logging exceptions thrown during the user function call
+ */
+ protected void callUserFunctionAndLogException() {
+ try {
+ callUserFunction();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ /**
+ * Method that will be called when the stream starts. The user should encode
+ * the processing functionality in {@link #mutableInvoke()} and
+ * {@link #immutableInvoke()}
+ *
+ */
+ public void invoke() throws Exception {
+ if (this.isMutable) {
+ mutableInvoke();
+ } else {
+ immutableInvoke();
+ }
}
/**
@@ -55,7 +138,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
* The configuration parameters for the operator
*/
public void open(Configuration parameters) throws Exception {
- isRunning=true;
+ isRunning = true;
if (userFunction instanceof RichFunction) {
((RichFunction) userFunction).open(parameters);
}
@@ -72,11 +155,4 @@ public abstract class StreamInvokable<OUT> implements Serializable {
((RichFunction) userFunction).close();
}
}
-
- /**
- * The method that will be called once when the operator is created, the
- * working mechanics of the operator should be implemented here
- *
- */
- public abstract void invoke() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
deleted file mode 100644
index 239f9f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The StreamOperatorInvokable represents the base class for all operators in
- * the streaming topology.
- *
- * @param <IN>
- * Input type of the operator
- * @param <OUT>
- * Output type of the operator
- */
-public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<OUT> {
-
- public StreamOperatorInvokable(Function userFunction) {
- super(userFunction);
- }
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorInvokable.class);
-
- protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
- protected StreamRecordSerializer<IN> serializer;
- protected StreamRecord<IN> reuse;
- protected boolean isMutable;
-
- /**
- * Initializes the {@link StreamOperatorInvokable} for input and output
- * handling
- *
- * @param collector
- * Collector object for collecting the outputs for the operator
- * @param recordIterator
- * Iterator for reading in the input records
- * @param serializer
- * Serializer used to deserialize inputs
- * @param isMutable
- * Mutability setting for the operator
- */
- public void initialize(Collector<OUT> collector,
- MutableObjectIterator<StreamRecord<IN>> recordIterator,
- StreamRecordSerializer<IN> serializer, boolean isMutable) {
- setCollector(collector);
- this.recordIterator = recordIterator;
- this.serializer = serializer;
- this.reuse = serializer.createInstance();
- this.isMutable = isMutable;
- }
-
- /**
- * Re-initializes the object in which the next input record will be read in
- */
- protected void resetReuse() {
- this.reuse = serializer.createInstance();
- }
-
- /**
- * Method that will be called if the mutability setting is set to immutable
- */
- protected abstract void immutableInvoke() throws Exception;
-
- /**
- * Method that will be called if the mutability setting is set to mutable
- */
- protected abstract void mutableInvoke() throws Exception;
-
- /**
- * The call of the user implemented function should be implemented here
- */
- protected abstract void callUserFunction() throws Exception;
-
- /**
- * Method for logging exceptions thrown during the user function call
- */
- protected void callUserFunctionAndLogException() {
- try {
- callUserFunction();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Calling user function failed due to: {}",
- StringUtils.stringifyException(e));
- }
- }
- }
-
- @Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- mutableInvoke();
- } else {
- immutableInvoke();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index 2e64d7c..81498dc 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -24,11 +24,11 @@ import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.SlidingWindowState;
-public class BatchGroupReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 41ae3f3..f306dac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -24,11 +24,11 @@ import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;
-public class BatchReduceInvokable<OUT> extends StreamOperatorInvokable<OUT, OUT> {
+public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<OUT> reducer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 12f43e1..7924595 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -17,9 +17,9 @@
package org.apache.flink.streaming.api.invokable.operator;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
-public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
+public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
private static final long serialVersionUID = 1L;
Long count = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 9e5b18a..a54b6ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
-public class FilterInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 1a0c93e..3452a82 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
-public class FlatMapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private FlatMapFunction<IN, OUT> flatMapper;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 40142ed..4feb4f3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
-public class MapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private MapFunction<IN, OUT> mapper;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 7080a7f..d327c76 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
-public class StreamReduceInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<IN> reducer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 29e7650..098bbc6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
+public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
public CoInvokable(Function userFunction) {
super(userFunction);
@@ -41,7 +41,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
protected StreamRecord<IN2> reuse2;
protected StreamRecordSerializer<IN1> serializer1;
protected StreamRecordSerializer<IN2> serializer2;
- protected boolean isMutable;
public void initialize(Collector<OUT> collector,
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
@@ -71,14 +70,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
this.reuse2 = serializer2.createInstance();
}
- public void invoke() throws Exception {
- if (this.isMutable) {
- mutableInvoke();
- } else {
- immutableInvoke();
- }
- }
-
+ @Override
protected void immutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
@@ -96,6 +88,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
}
+ @Override
protected void mutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
@@ -149,4 +142,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
deleted file mode 100644
index 03c3988..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-
-public abstract class AbstractStreamComponent extends AbstractInvokable {
-
- protected StreamConfig configuration;
- protected int instanceID;
- protected String name;
- private static int numComponents = 0;
- protected boolean isMutable;
- protected Object function;
- protected String functionName;
-
- protected static int newComponent() {
- numComponents++;
- return numComponents;
- }
-
- @Override
- public void registerInputOutput() {
- initialize();
- setInputsOutputs();
- setInvokable();
- }
-
- protected void initialize() {
- this.configuration = new StreamConfig(getTaskConfiguration());
- this.name = configuration.getComponentName();
- this.isMutable = configuration.getMutability();
- this.functionName = configuration.getFunctionName();
- this.function = configuration.getFunction();
- }
-
- protected <T> void invokeUserFunction(StreamInvokable<T> userInvokable) throws Exception {
- userInvokable.open(getTaskConfiguration());
- userInvokable.invoke();
- userInvokable.close();
- }
-
- protected abstract void setInputsOutputs();
-
- protected abstract void setInvokable();
-
- public String getName() {
- return name;
- }
-
- public int getInstanceID() {
- return instanceID;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
deleted file mode 100644
index 4483969..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
- AbstractStreamComponent {
-
- private OutputHandler<OUT> outputHandler;
-
- protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
- protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
-
- MutableObjectIterator<StreamRecord<IN1>> inputIter1;
- MutableObjectIterator<StreamRecord<IN2>> inputIter2;
-
- CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
- CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
-
- private CoInvokable<IN1, IN2, OUT> userInvokable;
- private static int numTasks;
-
- public CoStreamTask() {
- userInvokable = null;
- numTasks = newComponent();
- instanceID = numTasks;
- }
-
- private void setDeserializers() {
- TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
- inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
-
- TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
- inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
- }
-
- @Override
- public void setInputsOutputs() {
- outputHandler = new OutputHandler<OUT>(this);
-
- setConfigInputs();
-
- coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
- inputDeserializer1, inputDeserializer2);
- }
-
- @Override
- protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
- userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
- inputDeserializer2, isMutable);
- }
-
- protected void setConfigInputs() throws StreamComponentException {
- setDeserializers();
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
- ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = configuration.getInputType(i);
- switch (inputType) {
- case 1:
- inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
- this));
- break;
- case 2:
- inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
- this));
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
- }
- }
-
- coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
- inputList1, inputList2);
- }
-
- @Override
- public void invoke() throws Exception {
- outputHandler.invokeUserFunction("CO-TASK", userInvokable);
- }
-
-}