You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:54 UTC

[17/51] [abbrv] git commit: [streaming] Moved task configurations to StreamConfig

[streaming] Moved task configurations to StreamConfig


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/330d8fd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/330d8fd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/330d8fd5

Branch: refs/heads/master
Commit: 330d8fd524ba6cbc1ec96132ee75698d1f0d4af3
Parents: a2c4137
Author: ghermann <re...@gmail.com>
Authored: Thu Jul 24 14:57:28 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:11 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       | 242 +++++++++++++++++++
 .../AbstractStreamComponent.java                |  53 ++--
 .../api/streamcomponent/CoStreamTask.java       |  30 +--
 .../SingleInputAbstractStreamComponent.java     |  13 +-
 .../streamcomponent/StreamIterationSink.java    |   2 +-
 .../streamcomponent/StreamIterationSource.java  |  12 +-
 .../api/streamcomponent/StreamSink.java         |   4 +-
 .../api/streamcomponent/StreamSource.java       |  14 +-
 .../api/streamcomponent/StreamTask.java         |  13 +-
 .../api/streamcomponent/StreamWindowTask.java   |  96 --------
 .../examples/window/sum/WindowSumAggregate.java |  67 -----
 .../examples/window/sum/WindowSumLocal.java     |  43 ----
 .../examples/window/sum/WindowSumMultiple.java  |  36 ---
 .../examples/window/sum/WindowSumSink.java      |  31 ---
 .../examples/window/sum/WindowSumSource.java    |  41 ----
 .../wordcount/WindowWordCountCounter.java       |  82 -------
 .../window/wordcount/WindowWordCountLocal.java  |  50 ----
 .../window/wordcount/WindowWordCountSink.java   |  32 ---
 .../wordcount/WindowWordCountSplitter.java      |  46 ----
 19 files changed, 293 insertions(+), 614 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
new file mode 100644
index 0000000..d677046
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -0,0 +1,242 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+public class StreamConfig {
+	private static final String INPUT_TYPE = "inputType_";
+	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
+	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+	private static final String OUTPUT_NAME = "outputName_";
+	private static final String PARTITIONER_OBJECT = "partitionerObject_";
+	private static final String USER_DEFINED_NAME = "userDefinedName";
+	private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
+	private static final String ITERATION_ID = "iteration-id";
+	private static final String OUTPUT_SELECTOR = "outputSelector";
+	private static final String DIRECTED_EMIT = "directedEmit";
+	private static final String FUNCTION_NAME = "operatorName";
+	private static final String FUNCTION = "operator";
+	private static final String COMPONENT_NAME = "componentName";
+	private static final String SERIALIZEDUDF = "serializedudf";
+	private static final String USER_FUNCTION = "userfunction";
+	private static final String BUFFER_TIMEOUT = "bufferTimeout";
+
+	// DEFAULT VALUES
+
+	private static final boolean DEFAULT_IS_MUTABLE = false;
+
+	private static final long DEFAULT_TIMEOUT = 0;
+
+	// STRINGS
+
+	private static final String MUTABILITY = "isMutable";
+
+	private Configuration config;
+
+	public StreamConfig(Configuration config) {
+		this.config = config;
+	}
+
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	// CONFIGS
+
+	public void setMutability(boolean isMutable) {
+		config.setBoolean(MUTABILITY, isMutable);
+	}
+
+	public boolean getMutability() {
+		return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE);
+	}
+
+	public void setBufferTimeout(long timeout) {
+		config.setLong(BUFFER_TIMEOUT, timeout);
+	}
+
+	public long getBufferTimeout() {
+		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
+	}
+
+	public void setUserInvokableClass(Class<? extends StreamComponentInvokable> clazz) {
+		config.setClass(USER_FUNCTION, clazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T extends StreamComponentInvokable> Class<? extends T> getUserInvokableClass() {
+		return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
+	}
+
+	public void setUserInvokableObject(StreamComponentInvokable invokableObject) {
+		try {
+			config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize invokable object "
+					+ invokableObject.getClass(), e);
+		}
+	}
+
+	public StreamComponentInvokable getUserInvokableObject() {
+		try {
+			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
+		} catch (Exception e) {
+			new StreamComponentException("Cannot instantiate user function");
+		}
+		return null;
+	}
+
+	public void setComponentName(String componentName) {
+		config.setString(COMPONENT_NAME, componentName);
+	}
+
+	public String getComponentName() {
+		return config.getString(COMPONENT_NAME, null);
+	}
+
+	public void setFunction(byte[] serializedFunction) {
+		config.setBytes(FUNCTION, serializedFunction);
+	}
+
+	public Object getFunction() {
+		try {
+			return SerializationUtils.deserialize(config
+					.getBytes(FUNCTION, null));
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot deserialize invokable object", e);
+		}
+	}
+
+	public void setFunctionName(String functionName) {
+		config.setString(FUNCTION_NAME, functionName);
+	}
+
+	public String getFunctionName() {
+		return config.getString(FUNCTION_NAME, "");
+	}
+	
+	public void setUserDefinedName(String userDefinedName) {
+		if (userDefinedName != null) {
+			config.setString(USER_DEFINED_NAME, userDefinedName);
+		}
+	}
+
+	public void setDirectedEmit(boolean directedEmit) {
+		config.setBoolean(DIRECTED_EMIT, directedEmit);
+	}
+
+	public boolean getDirectedEmit() {
+		return config.getBoolean(DIRECTED_EMIT, false);
+	}
+
+	public void setOutputSelector(byte[] outputSelector) {
+		config.setBytes(OUTPUT_SELECTOR, outputSelector);
+
+	}
+
+	public <T extends Tuple> OutputSelector<T> getOutputSelector() {
+		try {
+			return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
+		} catch (Exception e) {
+			throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+					e);
+		}
+	}
+
+	public void setIterationId(String iterationId) {
+		config.setString(ITERATION_ID, iterationId);
+	}
+	
+	public String getIterationId() {
+		return config.getString(ITERATION_ID, "iteration-0");
+	}
+
+	public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) {
+		config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels);
+	}
+
+	public int getNumberOfOutputChannels(int outputIndex) {
+		return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
+	}
+
+	public <T extends Tuple> void setPartitioner(int outputIndex,
+			StreamPartitioner<T> partitionerObject) {
+
+		config.setBytes(PARTITIONER_OBJECT + outputIndex,
+				SerializationUtils.serialize(partitionerObject));
+	}
+
+	public <T extends Tuple> StreamPartitioner<T> getPartitioner(int outputIndex)
+			throws ClassNotFoundException, IOException {
+		return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
+				SerializationUtils.serialize(new ShufflePartitioner<T>())));
+	}
+
+	public void setOutputName(int outputIndex, String outputName) {
+		if (outputName != null) {
+			config.setString(OUTPUT_NAME + outputIndex, outputName);
+		}
+	}
+
+	public String getOutputName(int outputIndex) {
+		return config.getString(OUTPUT_NAME + outputIndex, null);
+	}
+
+	public void setNumberOfInputs(int numberOfInputs) {
+		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
+	}
+
+	public int getNumberOfInputs() {
+		return config.getInteger(NUMBER_OF_INPUTS, 0);
+	}
+
+	public void setNumberOfOutputs(int numberOfOutputs) {
+		config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+	}
+
+	public int getNumberOfOutputs() {
+		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+	}
+
+	public void setInputType(int inputNumber, Integer inputTypeNumber) {
+		config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
+	}
+
+	public int getInputType(int inputNumber) {
+		return config.getInteger(INPUT_TYPE + inputNumber, 0);
+	}
+	
+	@SuppressWarnings("unchecked")
+	protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
+			ClassNotFoundException {
+		return (T) SerializationUtils.deserialize(serializedObject);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 8afbddf..775e722 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -29,20 +29,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
@@ -55,7 +54,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	protected StreamRecordSerializer<OUT> outTupleSerializer = null;
 	protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
 
-	protected Configuration configuration;
+	protected StreamConfig configuration;
 	protected StreamCollector<OUT> collector;
 	protected int instanceID;
 	protected String name;
@@ -68,19 +67,13 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	}
 
 	protected void initialize() {
-		configuration = getTaskConfiguration();
-		name = configuration.getString("componentName", "MISSING_COMPONENT_NAME");
+		configuration = new StreamConfig(getTaskConfiguration());
+		name = configuration.getComponentName();
 	}
 
 	protected Collector<OUT> setCollector() {
-		if (configuration.getBoolean("directedEmit", false)) {
-			OutputSelector<OUT> outputSelector = null;
-			try {
-				outputSelector = deserializeObject(configuration.getBytes("outputSelector", null));
-			} catch (Exception e) {
-				throw new StreamComponentException(
-						"Cannot deserialize and instantiate OutputSelector", e);
-			}
+		if (configuration.getDirectedEmit()) {
+			OutputSelector<OUT> outputSelector = configuration.getOutputSelector();
 
 			collector = new DirectedStreamCollector<OUT>(instanceID, outSerializationDelegate,
 					outputSelector);
@@ -102,7 +95,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	protected void setConfigOutputs(
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 
-		int numberOfOutputs = configuration.getInteger("numberOfOutputs", 0);
+		int numberOfOutputs = configuration.getNumberOfOutputs();
 
 		for (int i = 0; i < numberOfOutputs; i++) {
 			setPartitioner(i, outputs);
@@ -111,17 +104,14 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 
 	private void setPartitioner(int outputNumber,
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
-
-		byte[] serializedPartitioner = configuration.getBytes("partitionerObject_" + outputNumber,
-				SerializationUtils.serialize((new ShufflePartitioner<OUT>())));
 		StreamPartitioner<OUT> outputPartitioner = null;
-
+		
 		try {
-			outputPartitioner = deserializeObject(serializedPartitioner);
+			outputPartitioner = configuration.getPartitioner(outputNumber);
 
 			RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
-			long bufferTimeout = configuration.getLong("bufferTimeout", 0);
+			long bufferTimeout = configuration.getBufferTimeout();
 
 			if (bufferTimeout > 0) {
 				output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(this,
@@ -132,7 +122,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 			}
 
 			outputs.add(output);
-			String outputName = configuration.getString("outputName_" + outputNumber, null);
+			String outputName = configuration.getOutputName(outputNumber);
 
 			if (collector != null) {
 				collector.addOutput(output, outputName);
@@ -143,7 +133,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 						+ " with " + outputNumber + " outputs");
 			}
 		} catch (Exception e) {
-			throw new StreamComponentException("Cannot deserialize "
+			throw new StreamComponentException("Cannot deserialize partitioner "
 					+ outputPartitioner.getClass().getSimpleName() + " of " + name + " with "
 					+ outputNumber + " outputs", e);
 		}
@@ -158,22 +148,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	 */
 	protected StreamComponentInvokable getInvokable(
 			Class<? extends StreamComponentInvokable> userFunctionClass) {
-		StreamComponentInvokable userFunction = null;
-
-		byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
-		this.isMutable = configuration.getBoolean("isMutable", false);
-
-		try {
-			userFunction = deserializeObject(userFunctionSerialized);
-		} catch (ClassNotFoundException e) {
-			new StreamComponentException("Cannot instantiate user function: "
-					+ userFunctionClass.getSimpleName());
-		} catch (IOException e) {
-			new StreamComponentException("Cannot instantiate user function: "
-					+ userFunctionClass.getSimpleName());
-		}
-
-		return userFunction;
+		
+		this.isMutable = configuration.getMutability();
+		return configuration.getUserInvokableObject();
 	}
 
 	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 60a8152..6a9c897 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -19,8 +19,6 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.MutableObjectIterator;
@@ -58,7 +55,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private CoInvokable<IN1, IN2, OUT> userFunction;
-	private int[] numberOfOutputChannels;
+//	private int[] numberOfOutputChannels;
 	private static int numTasks;
 
 	public CoStreamTask() {
@@ -70,21 +67,16 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	}
 
 	protected void setSerializers() {
-		byte[] operatorBytes = configuration.getBytes("operator", null);
-		String operatorName = configuration.getString("operatorName", "");
+		String operatorName = configuration.getFunctionName();
 
-		Object function = null;
+		Object function = configuration.getFunction();
 		try {
-			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
-			function = in.readObject();
-
 			if (operatorName.equals("coMap")) {
 				setSerializer(function, CoMapFunction.class, 2);
 				setDeserializers(function, CoMapFunction.class);
 			} else {
 				throw new Exception("Wrong operator name!");
 			}
-
 		} catch (Exception e) {
 			throw new StreamComponentException(e);
 		}
@@ -119,10 +111,10 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 		setConfigOutputs(outputs);
 
-		numberOfOutputChannels = new int[outputs.size()];
-		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-		}
+//		numberOfOutputChannels = new int[outputs.size()];
+//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
+//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+//		}
 
 		setInvokable();
 	}
@@ -131,21 +123,21 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	@Override
 	protected void setInvokable() {
 		// Default value is a CoMapInvokable
-		Class<? extends CoInvokable> userFunctionClass = configuration.getClass("userfunction",
-				CoMapInvokable.class, CoInvokable.class);
+		Class<? extends CoInvokable> userFunctionClass = configuration.getUserInvokableClass();
+		
 		userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
 		userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
 				inTupleSerializer2, isMutable);
 	}
 
 	protected void setConfigInputs() throws StreamComponentException {
-		int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+		int numberOfInputs = configuration.getNumberOfInputs();
 
 		ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
 		ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
 
 		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = configuration.getInteger("inputType_" + i, 0);
+			int inputType = configuration.getInputType(i);
 			switch (inputType) {
 			case 1:
 				inputList1.add(new MutableRecordReader<IOReadableWritable>(this));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index b49620f..86b26ff 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
-
 import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.api.java.functions.FlatMapFunction;
@@ -47,14 +44,10 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 	protected StreamRecordSerializer<IN> inTupleSerializer = null;
 
 	protected void setSerializers() {
-		byte[] operatorBytes = configuration.getBytes("operator", null);
-		String operatorName = configuration.getString("operatorName", "");
+		String operatorName = configuration.getFunctionName();
 
-		Object function = null;
+		Object function = configuration.getFunction();
 		try {
-			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
-			function = in.readObject();
-
 			if (operatorName.equals("flatMap")) {
 				setSerializerDeserializer(function, FlatMapFunction.class);
 			} else if (operatorName.equals("map")) {
@@ -110,7 +103,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 
 	@SuppressWarnings("unchecked")
 	protected MutableReader<IOReadableWritable> getConfigInputs() throws StreamComponentException {
-		int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
+		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs < 2) {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 1b25285..224fdfb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -53,7 +53,7 @@ public class StreamIterationSink<IN extends Tuple> extends SingleInputAbstractSt
 			setSinkSerializer();
 			inputs = getConfigInputs();
 			inputIter = createInputIterator(inputs, inTupleSerializer);
-			iterationId = configuration.getString("iteration-id", "iteration-0");
+			iterationId = configuration.getIterationId();
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
 		} catch (Exception e) {
 			throw new StreamComponentException(String.format(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index f880470..37e8e0f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -38,7 +38,7 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private static int numSources;
-	private int[] numberOfOutputChannels;
+//	private int[] numberOfOutputChannels;
 	private String iterationId;
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
@@ -63,12 +63,12 @@ public class StreamIterationSource<OUT extends Tuple> extends SingleInputAbstrac
 			throw new StreamComponentException("Cannot register outputs", e);
 		}
 
-		numberOfOutputChannels = new int[outputs.size()];
-		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-		}
+//		numberOfOutputChannels = new int[outputs.size()];
+//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
+//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+//		}
 
-		iterationId = configuration.getString("iteration-id", "iteration-0");
+		iterationId = configuration.getIterationId();
 		try {
 			BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 5e3457c..8cbe0ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -61,8 +61,8 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Override
 	protected void setInvokable() {
-		Class<? extends SinkInvokable> userFunctionClass = configuration.getClass("userfunction",
-				SinkInvokable.class, SinkInvokable.class);
+		Class<? extends SinkInvokable> userFunctionClass = configuration.getUserInvokableClass();
+		
 		userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
 		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 12c9ba3..c4f38ce 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -37,7 +37,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private UserSourceInvokable<OUT> userFunction;
 	private static int numSources;
-	private int[] numberOfOutputChannels;
+//	private int[] numberOfOutputChannels;
 
 	public StreamSource() {
 
@@ -60,10 +60,10 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 					+ getClass().getSimpleName(), e);
 		}
 
-		numberOfOutputChannels = new int[outputs.size()];
-		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-		}
+//		numberOfOutputChannels = new int[outputs.size()];
+//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
+//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+//		}
 
 		setInvokable();
 	}
@@ -72,8 +72,8 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
 	@Override
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
-		Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
-				"userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
+		Class<? extends UserSourceInvokable> userFunctionClass = configuration.getUserInvokableClass();
+//				.getClass("userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
 		userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 5032446..12e064f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -43,7 +43,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
 	private StreamRecordInvokable<IN, OUT> userFunction;
-	private int[] numberOfOutputChannels;
+//	private int[] numberOfOutputChannels;
 	private static int numTasks;
 
 	public StreamTask() {
@@ -65,10 +65,10 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 
 		inputIter = createInputIterator(inputs, inTupleSerializer);
 
-		numberOfOutputChannels = new int[outputs.size()];
-		for (int i = 0; i < numberOfOutputChannels.length; i++) {
-			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
-		}
+//		numberOfOutputChannels = new int[outputs.size()];
+//		for (int i = 0; i < numberOfOutputChannels.length; i++) {
+//			numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
+//		}
 
 		setInvokable();
 	}
@@ -77,8 +77,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 	@Override
 	protected void setInvokable() {
 		// Default value is a TaskInvokable even if it was called from a source
-		Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
-				"userfunction", UserTaskInvokable.class, UserTaskInvokable.class);
+		Class<? extends UserTaskInvokable> userFunctionClass = configuration.getUserInvokableClass();
 		userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
 		userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
deleted file mode 100644
index c044cc3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamWindowTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.state.SlidingWindowState;
-import org.apache.flink.streaming.state.StateManager;
-import org.apache.flink.util.Collector;
-
-public class StreamWindowTask<InTuple extends Tuple, OutTuple extends Tuple>
-		extends FlatMapFunction<InTuple, OutTuple> {
-	private static final long serialVersionUID = 1L;
-
-	private int computeGranularity;
-	private int windowFieldId;
-
-	private ArrayList<InTuple> tempTupleArray;
-	private SlidingWindowState<InTuple> window;
-	private long initTimestamp = -1;
-	private long nextTimestamp = -1;
-
-	protected StateManager checkpointer = new StateManager("object.out", 1000);
-
-	public StreamWindowTask(int windowSize, int slidingStep,
-			int computeGranularity, int windowFieldId) {
-		this.computeGranularity = computeGranularity;
-		this.windowFieldId = windowFieldId;
-		window = new SlidingWindowState<InTuple>(windowSize, slidingStep,
-				computeGranularity);
-		checkpointer.registerState(window);
-		Thread t = new Thread(checkpointer);
-		t.start();
-	}
-
-	protected void incrementCompute(ArrayList<InTuple> tupleArray) {
-	}
-
-	protected void decrementCompute(ArrayList<InTuple> tupleArray) {
-	}
-
-	protected void produceOutput(long progress, Collector<OutTuple> out) {
-	}
-
-	@Override
-	public void flatMap(InTuple value, Collector<OutTuple> out)
-			throws Exception {
-		long progress = (Long) value.getField(windowFieldId);
-		if (initTimestamp == -1) {
-			initTimestamp = progress;
-			nextTimestamp = initTimestamp + computeGranularity;
-			tempTupleArray = new ArrayList<InTuple>();
-		} else {
-			if (progress > nextTimestamp) {
-				if (window.isFull()) {
-					ArrayList<InTuple> expiredTupleArray = window.popFront();
-					incrementCompute(tempTupleArray);
-					decrementCompute(expiredTupleArray);
-					window.pushBack(tempTupleArray);
-					if (window.isEmittable()) {
-						produceOutput(progress, out);
-					}
-				} else {
-					incrementCompute(tempTupleArray);
-					window.pushBack(tempTupleArray);
-					if (window.isFull()) {
-						produceOutput(progress, out);
-					}
-				}
-				initTimestamp = nextTimestamp;
-				nextTimestamp = initTimestamp + computeGranularity;
-				tempTupleArray = new ArrayList<InTuple>();
-			}
-			tempTupleArray.add(value);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
deleted file mode 100644
index 12ada98..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumAggregate.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamcomponent.StreamWindowTask;
-import org.apache.flink.streaming.state.TableState;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowSumAggregate extends
-		StreamWindowTask<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
-	private static final long serialVersionUID = -2832409561059237150L;
-	private TableState<String, Integer> sum;
-	private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
-	
-	
-	public WindowSumAggregate(int windowSize, int slidingStep,
-			int computeGranularity, int windowFieldId) {
-		super(windowSize, slidingStep, computeGranularity, windowFieldId);
-		sum = new TableState<String, Integer>();
-		sum.put("sum", 0);
-		checkpointer.registerState(sum);
-	}
-
-	@Override
-	protected void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
-		for (int i = 0; i < tupleArray.size(); ++i) {
-			int number = tupleArray.get(i).f0;
-			sum.put("sum", sum.get("sum") + number);
-		}
-	}
-
-	@Override
-	protected void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
-		for (int i = 0; i < tupleArray.size(); ++i) {
-			int number = tupleArray.get(i).f0;
-			sum.put("sum", sum.get("sum") - number);
-		}
-	}
-
-	@Override
-	protected void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
-		outTuple.f0 = sum.get("sum");
-		outTuple.f1 = progress;
-		out.collect(outTuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
deleted file mode 100644
index c8390f0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumLocal.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumLocal {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-
-	public static void main(String[] args) {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		@SuppressWarnings("unused")
-		DataStream<Tuple2<Integer, Long>> dataStream = env
-				.addSource(new WindowSumSource(), SOURCE_PARALLELISM).map(new WindowSumMultiple())
-				.flatMap(new WindowSumAggregate(100, 20, 10, 1)).addSink(new WindowSumSink());
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
deleted file mode 100644
index 8d4ac0c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumMultiple.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumMultiple extends MapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
-	private static final long serialVersionUID = 1L;
-	
-	private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
-
-	@Override
-	public Tuple2<Integer, Long> map(Tuple2<Integer, Long> inTuple) throws Exception {
-		outTuple.f0 = inTuple.f0 * 2;
-		outTuple.f1 = inTuple.f1;
-		return outTuple;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
deleted file mode 100644
index 9c4547a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSink.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
-	private static final long serialVersionUID = 1L;
-	@Override
-	public void invoke(Tuple2<Integer, Long> inTuple) {
-		System.out.println(inTuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
deleted file mode 100644
index 12eda38..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/sum/WindowSumSource.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.sum;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
-	private static final long serialVersionUID = 1L;
-	
-	private Tuple2<Integer, Long> outRecord = new Tuple2<Integer, Long>();
-	private Long timestamp = 0L;
-
-	@Override
-	public void invoke(Collector<Tuple2<Integer, Long>> collector) throws Exception {
-		for (int i = 0; i < 1000; ++i) {
-			outRecord.f0 = i;
-			outRecord.f1 = timestamp;
-			collector.collect(outRecord);
-			timestamp++;
-		}		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
deleted file mode 100644
index dd99271..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountCounter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamcomponent.StreamWindowTask;
-import org.apache.flink.streaming.state.TableState;
-import org.apache.flink.streaming.state.TableStateIterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-
-public class WindowWordCountCounter extends
-		StreamWindowTask<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
-	private static final long serialVersionUID = 1L;
-
-	private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
-	private TableState<String, Integer> wordCounts;
-
-	public WindowWordCountCounter(int windowSize, int slidingStep,
-			int computeGranularity, int windowFieldId) {
-		super(windowSize, slidingStep, computeGranularity, windowFieldId);
-		wordCounts = new TableState<String, Integer>();
-	}
-	
-	@Override
-	protected void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
-		for (int i = 0; i < tupleArray.size(); ++i) {
-			String word = tupleArray.get(i).f0;
-			if (wordCounts.containsKey(word)) {
-				int count = wordCounts.get(word) + 1;
-				wordCounts.put(word, count);
-			} else {
-				wordCounts.put(word, 1);
-			}
-		}
-	}
-
-	@Override
-	protected void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
-		for (int i = 0; i < tupleArray.size(); ++i) {
-			String word = tupleArray.get(i).f0;
-			int count = wordCounts.get(word) - 1;
-			if (count == 0) {
-				wordCounts.delete(word);
-			} else {
-				wordCounts.put(word, count);
-			}
-		}
-	}
-
-	@Override
-	protected void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
-		TableStateIterator<String, Integer> iterator = wordCounts.getIterator();
-		while (iterator.hasNext()) {
-			Tuple2<String, Integer> tuple = iterator.next();
-			outTuple.f0 = tuple.f0;
-			outTuple.f1 = tuple.f1;
-			outTuple.f2 = progress;
-			out.collect(outTuple);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
deleted file mode 100644
index 2ef5af9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountLocal.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestDataUtil;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class WindowWordCountLocal {
-
-	private static final int PARALLELISM = 1;
-
-	// This example will count the occurrence of each word in the input file with a sliding window.
-	
-	public static void main(String[] args) {
-		
-		TestDataUtil.downloadIfNotExists("hamlet.txt");
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM);
-		
-		@SuppressWarnings("unused")
-		DataStream<Tuple3<String, Integer, Long>> dataStream = env
-				.readTextStream("src/test/resources/testdata/hamlet.txt")
-				.flatMap(new WindowWordCountSplitter())
-				.partitionBy(0)
-				.flatMap(new WindowWordCountCounter(10, 2, 1, 1))
-				.addSink(new WindowWordCountSink());
-		
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
deleted file mode 100644
index f8c009f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSink.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void invoke(Tuple3<String, Integer, Long> inTuple) {
-		System.out.println(inTuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/330d8fd5/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
deleted file mode 100644
index 19a533e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/wordcount/WindowWordCountSplitter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.examples.window.wordcount;
-
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WindowWordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple2<String, Long>> {
-	private static final long serialVersionUID = 1L;
-	
-	private String[] words = new String[] {};
-	private Long timestamp = 0L;
-	private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
-
-	// Splits the lines according to the spaces. And adds the line's timestamp to them.
-	@Override
-	public void flatMap(Tuple1<String> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
-
-		words=inTuple.f0.split(" ");
-		timestamp=System.currentTimeMillis();
-		for(String word : words){
-			outTuple.f0 = word;
-			outTuple.f1 = timestamp;
-			out.collect(outTuple);
-		}
-	}
-}
\ No newline at end of file