You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:58 UTC
[21/51] [abbrv] git commit: [streaming] Refactored StreamComponents
[streaming] Refactored StreamComponents
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d282eef1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d282eef1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d282eef1
Branch: refs/heads/master
Commit: d282eef1a316125debc9a85af9e6b909d319eb18
Parents: 330d8fd
Author: ghermann <re...@gmail.com>
Authored: Mon Jul 28 08:21:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/StreamConfig.java | 2 +-
.../AbstractStreamComponent.java | 66 +++++++++++++++++---
.../api/streamcomponent/CoStreamTask.java | 30 ++-------
.../SingleInputAbstractStreamComponent.java | 55 ++++++----------
.../streamcomponent/StreamIterationSink.java | 15 ++---
.../streamcomponent/StreamIterationSource.java | 15 ++---
.../api/streamcomponent/StreamSink.java | 26 ++------
.../api/streamcomponent/StreamSource.java | 23 ++-----
.../api/streamcomponent/StreamTask.java | 33 ++--------
9 files changed, 109 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d677046..fc4a1dd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -105,7 +105,7 @@ public class StreamConfig {
}
}
- public StreamComponentInvokable getUserInvokableObject() {
+ public <T extends StreamComponentInvokable> T getUserInvokableObject() {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 775e722..22c079c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -26,6 +26,10 @@ import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -39,7 +43,9 @@ import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.collector.StreamCollector;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
@@ -60,15 +66,21 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected String name;
private static int numComponents = 0;
protected boolean isMutable;
-
+ protected Object function;
+ protected String functionName;
+
protected static int newComponent() {
numComponents++;
return numComponents;
}
protected void initialize() {
- configuration = new StreamConfig(getTaskConfiguration());
- name = configuration.getComponentName();
+ this.configuration = new StreamConfig(getTaskConfiguration());
+ this.name = configuration.getComponentName();
+ this.isMutable = configuration.getMutability();
+ this.functionName = configuration.getFunctionName();
+ this.function = configuration.getFunction();
+
}
protected Collector<OUT> setCollector() {
@@ -83,6 +95,35 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
return collector;
}
+ protected void setSerializers() {
+ try {
+ if (functionName.equals("flatMap")) {
+ setSerializer(function, FlatMapFunction.class, 1);
+ } else if (functionName.equals("map")) {
+ setSerializer(function, MapFunction.class, 1);
+ } else if (functionName.equals("batchReduce")) {
+ setSerializer(function, GroupReduceFunction.class, 1);
+ } else if (functionName.equals("filter")) {
+ setSerializer(function, FilterFunction.class, 0);
+ } else if (functionName.equals("source")) {
+ setSerializer(function, UserSourceInvokable.class, 0);
+ } else if (functionName.equals("coMap")) {
+ setSerializer(function, CoMapFunction.class, 2);
+ } else if (functionName.equals("elements")) {
+ outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
+
+ outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
+ outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
+ outTupleSerializer);
+ } else {
+ throw new Exception("Wrong operator name: " + functionName);
+ }
+ } catch (Exception e) {
+ throw new StreamComponentException(e);
+ }
+
+ }
+
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
@@ -94,7 +135,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected void setConfigOutputs(
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-
+ setSerializers();
+ setCollector();
+
int numberOfOutputs = configuration.getNumberOfOutputs();
for (int i = 0; i < numberOfOutputs; i++) {
@@ -146,10 +189,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
* Class of the invokable function
* @return The StreamComponent object
*/
- protected StreamComponentInvokable getInvokable(
- Class<? extends StreamComponentInvokable> userFunctionClass) {
-
- this.isMutable = configuration.getMutability();
+ protected <T extends StreamComponentInvokable> T getInvokable() {
return configuration.getUserInvokableObject();
}
@@ -170,6 +210,16 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
return (T) SerializationUtils.deserialize(serializedObject);
}
+
+ @Override
+ public void registerInputOutput() {
+ initialize();
+ setInputsOutputs();
+ setInvokable();
+ }
+
+ protected abstract void setInputsOutputs();
+
protected abstract void setInvokable();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 6a9c897..4d531f7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -55,7 +55,6 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private CoInvokable<IN1, IN2, OUT> userFunction;
-// private int[] numberOfOutputChannels;
private static int numTasks;
public CoStreamTask() {
@@ -84,7 +83,6 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
-
TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
function.getClass(), 0, null, null);
inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
@@ -95,37 +93,17 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
}
@Override
- public void registerInputOutput() {
- initialize();
-
- setSerializers();
- setCollector();
-
- // inputs1 = setConfigInputs();
+ public void setInputsOutputs() {
+ setConfigOutputs(outputs);
setConfigInputs();
inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
-
- // inputs2 = setConfigInputs();
inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
-
- setConfigOutputs(outputs);
-
-// numberOfOutputChannels = new int[outputs.size()];
-// for (int i = 0; i < numberOfOutputChannels.length; i++) {
-// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-// }
-
- setInvokable();
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
+
@Override
protected void setInvokable() {
- // Default value is a CoMapInvokable
- Class<? extends CoInvokable> userFunctionClass = configuration.getUserInvokableClass();
-
- userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
+ userFunction = getInvokable();
userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
inTupleSerializer2, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index 86b26ff..0b5b377 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -31,47 +31,35 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
AbstractStreamComponent<OUT> {
protected StreamRecordSerializer<IN> inTupleSerializer = null;
+ protected MutableObjectIterator<StreamRecord<IN>> inputIter;
+ protected MutableReader<IOReadableWritable> inputs;
- protected void setSerializers() {
- String operatorName = configuration.getFunctionName();
-
- Object function = configuration.getFunction();
+ protected void setDeserializers() {
try {
- if (operatorName.equals("flatMap")) {
- setSerializerDeserializer(function, FlatMapFunction.class);
- } else if (operatorName.equals("map")) {
- setSerializerDeserializer(function, MapFunction.class);
- } else if (operatorName.equals("batchReduce")) {
- setSerializerDeserializer(function, GroupReduceFunction.class);
- } else if (operatorName.equals("filter")) {
+ if (functionName.equals("flatMap")) {
+ setDeserializer(function, FlatMapFunction.class);
+ } else if (functionName.equals("map")) {
+ setDeserializer(function, MapFunction.class);
+ } else if (functionName.equals("batchReduce")) {
+ setDeserializer(function, GroupReduceFunction.class);
+ } else if (functionName.equals("filter")) {
setDeserializer(function, FilterFunction.class);
- setSerializer(function, FilterFunction.class, 0);
- } else if (operatorName.equals("sink")) {
- setDeserializer(function, SinkFunction.class);
- } else if (operatorName.equals("source")) {
+ } else if (functionName.equals("source")) {
setSerializer(function, UserSourceInvokable.class, 0);
- } else if (operatorName.equals("coMap")) {
- setSerializer(function, CoMapFunction.class, 2);
- //setDeserializers(function, CoMapFunction.class);
- } else if (operatorName.equals("elements")) {
- outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
- outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
- outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
- outTupleSerializer);
+ } else if (functionName.equals("sink")) {
+ setDeserializer(function, SinkFunction.class);
} else {
- throw new Exception("Wrong operator name: " + operatorName);
+ throw new Exception("Wrong operator name: " + functionName);
}
} catch (Exception e) {
@@ -79,11 +67,6 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
}
}
- private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
- setDeserializer(function, clazz);
- setSerializer(function, clazz, 1);
- }
-
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
@@ -102,12 +85,14 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
}
@SuppressWarnings("unchecked")
- protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
+ protected void setConfigInputs() throws StreamComponentException {
+ setDeserializers();
+
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs < 2) {
- return new MutableRecordReader<IOReadableWritable>(this);
+ inputs = new MutableRecordReader<IOReadableWritable>(this);
} else {
MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
@@ -115,7 +100,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
}
- return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+ inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 224fdfb..5532626 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
@@ -34,8 +33,6 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
- @SuppressWarnings("rawtypes")
- private MutableReader inputs;
MutableObjectIterator<StreamRecord<IN>> inputIter;
private String iterationId;
@SuppressWarnings("rawtypes")
@@ -45,22 +42,22 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
}
@Override
- public void registerInputOutput() {
- initialize();
-
+ public void setInputsOutputs() {
try {
- setSerializers();
+ setConfigInputs();
setSinkSerializer();
- inputs = getConfigInputs();
+
inputIter = createInputIterator(inputs, inTupleSerializer);
+
iterationId = configuration.getIterationId();
dataChannel = BlockingQueueBroker.instance().get(iterationId);
+
} catch (Exception e) {
throw new StreamComponentException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
}
}
-
+
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 37e8e0f..d020058 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -38,7 +38,6 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private static int numSources;
-// private int[] numberOfOutputChannels;
private String iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
@@ -53,29 +52,23 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
}
@Override
- public void registerInputOutput() {
- initialize();
-
+ public void setInputsOutputs() {
try {
- setSerializers();
setConfigOutputs(outputs);
+ setSinkSerializer();
} catch (StreamComponentException e) {
throw new StreamComponentException("Cannot register outputs", e);
}
-// numberOfOutputChannels = new int[outputs.size()];
-// for (int i = 0; i < numberOfOutputChannels.length; i++) {
-// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-// }
-
iterationId = configuration.getIterationId();
try {
BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
} catch (Exception e) {
}
- }
+ }
+
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 8cbe0ea..7ac117e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -22,19 +22,12 @@ package org.apache.flink.streaming.api.streamcomponent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.MutableObjectIterator;
public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
private static final Log LOG = LogFactory.getLog(StreamSink.class);
- @SuppressWarnings("rawtypes")
- private MutableReader inputs;
- private MutableObjectIterator<StreamRecord<IN>> inputIter;
private StreamRecordInvokable<IN, IN> userFunction;
public StreamSink() {
@@ -42,28 +35,21 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
}
@Override
- public void registerInputOutput() {
- initialize();
-
+ public void setInputsOutputs() {
try {
- setSerializers();
+ setConfigInputs();
setSinkSerializer();
- inputs = getConfigInputs();
+
inputIter = createInputIterator(inputs, inTupleSerializer);
} catch (Exception e) {
throw new StreamComponentException("Cannot register inputs for "
+ getClass().getSimpleName(), e);
}
-
- setInvokable();
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
+
@Override
- protected void setInvokable() {
- Class<? extends SinkInvokable> userFunctionClass = configuration.getUserInvokableClass();
-
- userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
+ protected void setInvokable() {
+ userFunction = getInvokable();
userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index c4f38ce..856c917 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -37,7 +37,6 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private UserSourceInvokable<OUT> userFunction;
private static int numSources;
-// private int[] numberOfOutputChannels;
public StreamSource() {
@@ -48,33 +47,19 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
}
@Override
- public void registerInputOutput() {
- initialize();
-
+ public void setInputsOutputs() {
try {
- setSerializers();
- setCollector();
setConfigOutputs(outputs);
} catch (StreamComponentException e) {
throw new StreamComponentException("Cannot register outputs for "
+ getClass().getSimpleName(), e);
- }
-
-// numberOfOutputChannels = new int[outputs.size()];
-// for (int i = 0; i < numberOfOutputChannels.length; i++) {
-// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-// }
-
- setInvokable();
+ }
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
+
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
- Class<? extends UserSourceInvokable> userFunctionClass = configuration.getUserInvokableClass();
-// .getClass("userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
- userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
+ userFunction = getInvokable();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 12e064f..19b1c4b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -25,60 +25,39 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.MutableObjectIterator;
public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
SingleInputAbstractStreamComponent<IN, OUT> {
private static final Log LOG = LogFactory.getLog(StreamTask.class);
- private MutableReader<IOReadableWritable> inputs;
- MutableObjectIterator<StreamRecord<IN>> inputIter;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private StreamRecordInvokable<IN, OUT> userFunction;
-// private int[] numberOfOutputChannels;
private static int numTasks;
public StreamTask() {
-
outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
userFunction = null;
numTasks = newComponent();
instanceID = numTasks;
}
-
+
@Override
- public void registerInputOutput() {
- initialize();
-
- setSerializers();
- setCollector();
- inputs = getConfigInputs();
+ public void setInputsOutputs() {
+ setConfigInputs();
setConfigOutputs(outputs);
- inputIter = createInputIterator(inputs, inTupleSerializer);
-
-// numberOfOutputChannels = new int[outputs.size()];
-// for (int i = 0; i < numberOfOutputChannels.length; i++) {
-// numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-// }
-
- setInvokable();
+ inputIter = createInputIterator(inputs, inTupleSerializer);
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
+
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
- Class<? extends UserTaskInvokable> userFunctionClass = configuration.getUserInvokableClass();
- userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
+ userFunction = getInvokable();
userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}