You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:58 UTC

[21/51] [abbrv] git commit: [streaming] Refactored StreamComponents

[streaming] Refactored StreamComponents


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d282eef1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d282eef1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d282eef1

Branch: refs/heads/master
Commit: d282eef1a316125debc9a85af9e6b909d319eb18
Parents: 330d8fd
Author: ghermann <re...@gmail.com>
Authored: Mon Jul 28 08:21:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       |  2 +-
 .../AbstractStreamComponent.java                | 66 +++++++++++++++++---
 .../api/streamcomponent/CoStreamTask.java       | 30 ++-------
 .../SingleInputAbstractStreamComponent.java     | 55 ++++++----------
 .../streamcomponent/StreamIterationSink.java    | 15 ++---
 .../streamcomponent/StreamIterationSource.java  | 15 ++---
 .../api/streamcomponent/StreamSink.java         | 26 ++------
 .../api/streamcomponent/StreamSource.java       | 23 ++-----
 .../api/streamcomponent/StreamTask.java         | 33 ++--------
 9 files changed, 109 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index d677046..fc4a1dd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -105,7 +105,7 @@ public class StreamConfig {
 		}
 	}
 
-	public StreamComponentInvokable getUserInvokableObject() {
+	public <T extends StreamComponentInvokable> T getUserInvokableObject() {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 775e722..22c079c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -26,6 +26,10 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -39,7 +43,9 @@ import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
@@ -60,15 +66,21 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	protected String name;
 	private static int numComponents = 0;
 	protected boolean isMutable;
-
+	protected Object function;
+	protected String functionName;
+	
 	protected static int newComponent() {
 		numComponents++;
 		return numComponents;
 	}
 
 	protected void initialize() {
-		configuration = new StreamConfig(getTaskConfiguration());
-		name = configuration.getComponentName();
+		this.configuration = new StreamConfig(getTaskConfiguration());
+		this.name = configuration.getComponentName();
+		this.isMutable = configuration.getMutability();
+		this.functionName = configuration.getFunctionName();
+		this.function = configuration.getFunction();
+
 	}
 
 	protected Collector<OUT> setCollector() {
@@ -83,6 +95,35 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		return collector;
 	}
 
+	protected void setSerializers() {
+		try {
+			if (functionName.equals("flatMap")) {
+				setSerializer(function, FlatMapFunction.class, 1);
+			} else if (functionName.equals("map")) {
+				setSerializer(function, MapFunction.class, 1);
+			} else if (functionName.equals("batchReduce")) {
+				setSerializer(function, GroupReduceFunction.class, 1);
+			} else if (functionName.equals("filter")) {
+				setSerializer(function, FilterFunction.class, 0);
+			} else if (functionName.equals("source")) {
+				setSerializer(function, UserSourceInvokable.class, 0);
+			} else if (functionName.equals("coMap")) {
+				setSerializer(function, CoMapFunction.class, 2);
+			} else if (functionName.equals("elements")) {
+				outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
+
+				outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
+				outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
+						outTupleSerializer);
+			} else {
+				throw new Exception("Wrong operator name: " + functionName);
+			}
+		} catch (Exception e) {
+			throw new StreamComponentException(e);
+		}
+	
+	}
+	
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
 		outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
@@ -94,7 +135,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 
 	protected void setConfigOutputs(
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-
+		setSerializers();
+		setCollector();
+		
 		int numberOfOutputs = configuration.getNumberOfOutputs();
 
 		for (int i = 0; i < numberOfOutputs; i++) {
@@ -146,10 +189,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	 *            Class of the invokable function
 	 * @return The StreamComponent object
 	 */
-	protected StreamComponentInvokable getInvokable(
-			Class<? extends StreamComponentInvokable> userFunctionClass) {
-		
-		this.isMutable = configuration.getMutability();
+	protected <T extends StreamComponentInvokable> T getInvokable() {
 		return configuration.getUserInvokableObject();
 	}
 
@@ -170,6 +210,16 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
 
+
+	@Override
+	public void registerInputOutput() {
+		initialize();
+		setInputsOutputs();		
+		setInvokable();
+	}
+	
+	protected abstract void setInputsOutputs();
+	
 	protected abstract void setInvokable();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 6a9c897..4d531f7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -55,7 +55,6 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private CoInvokable<IN1, IN2, OUT> userFunction;
-//	private int[] numberOfOutputChannels;
 	private static int numTasks;
 
 	public CoStreamTask() {
@@ -84,7 +83,6 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
-
 		TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
 				function.getClass(), 0, null, null);
 		inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
@@ -95,37 +93,17 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	}
 
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
-		setSerializers();
-		setCollector();
-
-		// inputs1 = setConfigInputs();
+	public void setInputsOutputs() {
+		setConfigOutputs(outputs);
 		setConfigInputs();
 
 		inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
-
-		// inputs2 = setConfigInputs();
 		inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
-
-		setConfigOutputs(outputs);
-
-//		numberOfOutputChannels = new int[outputs.size()];
-//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-//		}
-
-		setInvokable();
 	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+	
 	@Override
 	protected void setInvokable() {
-		// Default value is a CoMapInvokable
-		Class<? extends CoInvokable> userFunctionClass = configuration.getUserInvokableClass();
-		
-		userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
+		userFunction = getInvokable();
 		userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
 				inTupleSerializer2, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index 86b26ff..0b5b377 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -31,47 +31,35 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
 		AbstractStreamComponent<OUT> {
 
 	protected StreamRecordSerializer<IN> inTupleSerializer = null;
+	protected MutableObjectIterator<StreamRecord<IN>> inputIter;
+	protected MutableReader<IOReadableWritable> inputs;
 
-	protected void setSerializers() {
-		String operatorName = configuration.getFunctionName();
-
-		Object function = configuration.getFunction();
+	protected void setDeserializers() {
 		try {
-			if (operatorName.equals("flatMap")) {
-				setSerializerDeserializer(function, FlatMapFunction.class);
-			} else if (operatorName.equals("map")) {
-				setSerializerDeserializer(function, MapFunction.class);
-			} else if (operatorName.equals("batchReduce")) {
-				setSerializerDeserializer(function, GroupReduceFunction.class);
-			} else if (operatorName.equals("filter")) {
+			if (functionName.equals("flatMap")) {
+				setDeserializer(function, FlatMapFunction.class);
+			} else if (functionName.equals("map")) {
+				setDeserializer(function, MapFunction.class);
+			} else if (functionName.equals("batchReduce")) {
+				setDeserializer(function, GroupReduceFunction.class);
+			} else if (functionName.equals("filter")) {
 				setDeserializer(function, FilterFunction.class);
-				setSerializer(function, FilterFunction.class, 0);
-			} else if (operatorName.equals("sink")) {
-				setDeserializer(function, SinkFunction.class);
-			} else if (operatorName.equals("source")) {
+			} else if (functionName.equals("source")) {
 				setSerializer(function, UserSourceInvokable.class, 0);
-			} else if (operatorName.equals("coMap")) {
-				setSerializer(function, CoMapFunction.class, 2);
-				//setDeserializers(function, CoMapFunction.class);
-			} else if (operatorName.equals("elements")) {
-				outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
-				outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
-				outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
-						outTupleSerializer);
+			} else if (functionName.equals("sink")) {
+				setDeserializer(function, SinkFunction.class);
 			} else {
-				throw new Exception("Wrong operator name: " + operatorName);
+				throw new Exception("Wrong operator name: " + functionName);
 			}
 
 		} catch (Exception e) {
@@ -79,11 +67,6 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 		}
 	}
 
-	private void setSerializerDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
-		setDeserializer(function, clazz);
-		setSerializer(function, clazz, 1);
-	}
-
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
 		TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
@@ -102,12 +85,14 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 	}
 
 	@SuppressWarnings("unchecked")
-	protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
+	protected void setConfigInputs() throws StreamComponentException {
+		setDeserializers();
+		
 		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs < 2) {
 
-			return new MutableRecordReader<IOReadableWritable>(this);
+			inputs = new MutableRecordReader<IOReadableWritable>(this);
 
 		} else {
 			MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
@@ -115,7 +100,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 			for (int i = 0; i < numberOfInputs; i++) {
 				recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
 			}
-			return new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+			inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 224fdfb..5532626 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
@@ -34,8 +33,6 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
 
 	private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
 
-	@SuppressWarnings("rawtypes")
-	private MutableReader inputs;
 	MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
@@ -45,22 +42,22 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
 	}
 
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
+	public void setInputsOutputs() {
 		try {
-			setSerializers();
+			setConfigInputs();
 			setSinkSerializer();
-			inputs = getConfigInputs();
+
 			inputIter = createInputIterator(inputs, inTupleSerializer);
+
 			iterationId = configuration.getIterationId();
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
+			
 		} catch (Exception e) {
 			throw new StreamComponentException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
 		}
 	}
-
+	
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 37e8e0f..d020058 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -38,7 +38,6 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private static int numSources;
-//	private int[] numberOfOutputChannels;
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
@@ -53,29 +52,23 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 	}
 
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
+	public void setInputsOutputs() {
 		try {
-			setSerializers();
 			setConfigOutputs(outputs);
+			setSinkSerializer();
 		} catch (StreamComponentException e) {
 			throw new StreamComponentException("Cannot register outputs", e);
 		}
 
-//		numberOfOutputChannels = new int[outputs.size()];
-//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-//		}
-
 		iterationId = configuration.getIterationId();
 		try {
 			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
 		} catch (Exception e) {
 
 		}
-	}
 
+	}
+	
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 8cbe0ea..7ac117e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -22,19 +22,12 @@ package org.apache.flink.streaming.api.streamcomponent;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.MutableObjectIterator;
 
 public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
 
 	private static final Log LOG = LogFactory.getLog(StreamSink.class);
 
-	@SuppressWarnings("rawtypes")
-	private MutableReader inputs;
-	private MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private StreamRecordInvokable<IN, IN> userFunction;
 
 	public StreamSink() {
@@ -42,28 +35,21 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 	}
 
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
+	public void setInputsOutputs() {
 		try {
-			setSerializers();
+			setConfigInputs();
 			setSinkSerializer();
-			inputs = getConfigInputs();
+			
 			inputIter = createInputIterator(inputs, inTupleSerializer);
 		} catch (Exception e) {
 			throw new StreamComponentException("Cannot register inputs for "
 					+ getClass().getSimpleName(), e);
 		}
-
-		setInvokable();
 	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+	
 	@Override
-	protected void setInvokable() {
-		Class<? extends SinkInvokable> userFunctionClass = configuration.getUserInvokableClass();
-		
-		userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
+	protected void setInvokable() {		
+		userFunction = getInvokable();
 		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index c4f38ce..856c917 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -37,7 +37,6 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private UserSourceInvokable<OUT> userFunction;
 	private static int numSources;
-//	private int[] numberOfOutputChannels;
 
 	public StreamSource() {
 
@@ -48,33 +47,19 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	}
 
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
+	public void setInputsOutputs() {
 		try {
-			setSerializers();
-			setCollector();
 			setConfigOutputs(outputs);
 		} catch (StreamComponentException e) {
 			throw new StreamComponentException("Cannot register outputs for "
 					+ getClass().getSimpleName(), e);
-		}
-
-//		numberOfOutputChannels = new int[outputs.size()];
-//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-//		}
-
-		setInvokable();
+		}		
 	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+	
 	@Override
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
-		Class<? extends UserSourceInvokable> userFunctionClass = configuration.getUserInvokableClass();
-//				.getClass("userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
-		userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
+		userFunction = getInvokable();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d282eef1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 12e064f..19b1c4b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -25,60 +25,39 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.MutableObjectIterator;
 
 public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 		SingleInputAbstractStreamComponent<IN, OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamTask.class);
 
-	private MutableReader<IOReadableWritable> inputs;
-	MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private StreamRecordInvokable<IN, OUT> userFunction;
-//	private int[] numberOfOutputChannels;
 	private static int numTasks;
 
 	public StreamTask() {
-
 		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 		userFunction = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
 	}
-
+	
 	@Override
-	public void registerInputOutput() {
-		initialize();
-
-		setSerializers();
-		setCollector();
-		inputs = getConfigInputs();
+	public void setInputsOutputs() {
+		setConfigInputs();
 		setConfigOutputs(outputs);
 
-		inputIter = createInputIterator(inputs, inTupleSerializer);
-
-//		numberOfOutputChannels = new int[outputs.size()];
-//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-//		}
-
-		setInvokable();
+		inputIter = createInputIterator(inputs, inTupleSerializer);		
 	}
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
+	
 	@Override
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
-		Class<? extends UserTaskInvokable> userFunctionClass = configuration.getUserInvokableClass();
-		userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
+		userFunction = getInvokable();
 		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}