You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:53 UTC

[10/18] git commit: [streaming] DataStream type refactor for easier future extensions

[streaming] DataStream type refactor for easier future extensions


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9cbd68a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9cbd68a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9cbd68a8

Branch: refs/heads/master
Commit: 9cbd68a8430cb4da4e63275974a3626d596896c2
Parents: 13a9277
Author: gyfora <gy...@gmail.com>
Authored: Sat Sep 6 18:14:07 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  82 +++++++++------
 .../api/datastream/GroupedDataStream.java       | 105 +++++++------------
 .../api/datastream/IterativeDataStream.java     |  14 +--
 .../api/datastream/MergedDataStream.java        |  96 -----------------
 .../state/SlidingWindowStateIterator.java       |   2 +
 5 files changed, 89 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d78ceae..bd5b83c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -81,7 +81,7 @@ import org.apache.flink.types.TypeInformation;
  *            The type of the DataStream, i.e., the type of the elements of the
  *            DataStream.
  */
-public abstract class DataStream<OUT> {
+public class DataStream<OUT> {
 
 	protected static Integer counter = 0;
 	protected final StreamExecutionEnvironment environment;
@@ -91,6 +91,7 @@ public abstract class DataStream<OUT> {
 	protected boolean selectAll;
 	protected StreamPartitioner<OUT> partitioner;
 	protected TypeSerializerWrapper<OUT> outTypeWrapper;
+	protected List<DataStream<OUT>> mergedStreams;
 
 	protected final JobGraphBuilder jobGraphBuilder;
 
@@ -120,6 +121,8 @@ public abstract class DataStream<OUT> {
 		this.selectAll = false;
 		this.partitioner = new ForwardPartitioner<OUT>();
 		this.outTypeWrapper = outTypeWrapper;
+		this.mergedStreams = new ArrayList<DataStream<OUT>>();
+		this.mergedStreams.add(this);
 	}
 
 	/**
@@ -137,6 +140,14 @@ public abstract class DataStream<OUT> {
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 		this.outTypeWrapper = dataStream.outTypeWrapper;
+		this.mergedStreams = new ArrayList<DataStream<OUT>>();
+		this.mergedStreams.add(this);
+		if (dataStream.mergedStreams.size() > 1) {
+			for (int i = 1; i < dataStream.mergedStreams.size(); i++) {
+				this.mergedStreams.add(new DataStream<OUT>(dataStream.mergedStreams.get(i)));
+			}
+		}
+
 	}
 
 	/**
@@ -218,15 +229,26 @@ public abstract class DataStream<OUT> {
 	 *            The DataStreams to merge output with.
 	 * @return The {@link MergedDataStream}.
 	 */
-	public MergedDataStream<OUT> merge(DataStream<OUT>... streams) {
-		MergedDataStream<OUT> returnStream = new MergedDataStream<OUT>(this);
+	public DataStream<OUT> merge(DataStream<OUT>... streams) {
+		DataStream<OUT> returnStream = this.copy();
 
 		for (DataStream<OUT> stream : streams) {
-			returnStream.addConnection(stream);
+			for (DataStream<OUT> ds : stream.mergedStreams) {
+				validateMerge(ds.getId());
+				returnStream.mergedStreams.add(ds.copy());
+			}
 		}
 		return returnStream;
 	}
 
+	private void validateMerge(String id) {
+		for (DataStream<OUT> ds : this.mergedStreams) {
+			if (ds.getId().equals(id)) {
+				throw new RuntimeException("A DataStream cannot be merged with itself");
+			}
+		}
+	}
+
 	/**
 	 * Creates a new {@link ConnectedDataStream} by connecting
 	 * {@link DataStream} outputs of different type with each other. The
@@ -529,12 +551,12 @@ public abstract class DataStream<OUT> {
 	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
 		checkFieldRange(positionToSum);
-		return aggregateAll((AggregationFunction<OUT>) SumAggregationFunction
-				.getSumFunction(positionToSum, getClassAtPos(positionToSum)));
+		return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
+				positionToSum, getClassAtPos(positionToSum)));
 	}
 
 	/**
-	 * Applies an aggregation that sums the data stream at the first position .
+	 * Syntactic sugar for sum(0)
 	 * 
 	 * @return The transformed DataStream.
 	 */
@@ -552,12 +574,11 @@ public abstract class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
 		checkFieldRange(positionToMin);
-		return aggregateAll(new MinAggregationFunction<OUT>(positionToMin));
+		return aggregate(new MinAggregationFunction<OUT>(positionToMin));
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum of the data stream at
-	 * the first position.
+	 * Syntactic sugar for min(0)
 	 * 
 	 * @return The transformed DataStream.
 	 */
@@ -575,12 +596,11 @@ public abstract class DataStream<OUT> {
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
 		checkFieldRange(positionToMax);
-		return aggregateAll(new MaxAggregationFunction<OUT>(positionToMax));
+		return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
 	}
 
 	/**
-	 * Applies an aggregation that gives the maximum of the data stream at the
-	 * first position.
+	 * Syntactic sugar for max(0)
 	 * 
 	 * @return The transformed DataStream.
 	 */
@@ -588,20 +608,14 @@ public abstract class DataStream<OUT> {
 		return max(0);
 	}
 
-	private SingleOutputStreamOperator<OUT, ?> aggregateAll(
-			AggregationFunction<OUT> aggregate) {
-		return aggregate(aggregate, new StreamReduceInvokable<OUT>(aggregate), "reduce");
-	}
+	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
-	SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate,
-			StreamReduceInvokable<OUT> invokable, String functionName) {
-		DataStream<OUT> inputStream = this.copy();
+		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = inputStream.addFunction(functionName,
-				aggregate, null, null, invokable);
-
-		this.jobGraphBuilder.setTypeWrappersFrom(inputStream.getId(), returnStream.getId());
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, null,
+				null, invokable);
 
+		this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
 		return returnStream;
 	}
 
@@ -1032,7 +1046,9 @@ public abstract class DataStream<OUT> {
 	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
 		DataStream<OUT> returnStream = this.copy();
 
-		returnStream.partitioner = partitioner;
+		for (DataStream<OUT> stream : returnStream.mergedStreams) {
+			stream.partitioner = partitioner;
+		}
 
 		return returnStream;
 	}
@@ -1051,14 +1067,9 @@ public abstract class DataStream<OUT> {
 	 *            Number of the type (used at co-functions)
 	 */
 	protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
-		if (inputStream instanceof MergedDataStream) {
-			for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
-				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
-						inputStream.userDefinedNames, inputStream.selectAll);
-			}
-		} else {
-			jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
-					typeNumber, inputStream.userDefinedNames, inputStream.selectAll);
+		for (DataStream<X> stream : inputStream.mergedStreams) {
+			jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
+					inputStream.userDefinedNames, inputStream.selectAll);
 		}
 
 	}
@@ -1104,5 +1115,8 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return The copy
 	 */
-	protected abstract DataStream<OUT> copy();
+	protected DataStream<OUT> copy(){
+		return new DataStream<OUT>(this);
+	}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 06bec0a..c5b010d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -22,47 +22,35 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
 import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.types.TypeInformation;
 
 /**
- * A GroupedDataStream represents a data stream which has been partitioned by
- * the given key in the values. Operators like {@link #reduce},
- * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream}.
+ * A GroupedDataStream represents a {@link DataStream} which has been
+ * partitioned by the given key in the values. Operators like {@link #reduce},
+ * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
+ * get additional functionality by the grouping.
  *
  * @param <OUT>
  *            The output type of the {@link GroupedDataStream}.
  */
-public class GroupedDataStream<OUT> {
+public class GroupedDataStream<OUT> extends DataStream<OUT> {
 
-	DataStream<OUT> dataStream;
 	int keyPosition;
 
 	protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
-		this.dataStream = dataStream.partitionBy(keyPosition);
+		super(dataStream.partitionBy(keyPosition));
 		this.keyPosition = keyPosition;
 	}
 
-	/**
-	 * Applies a reduce transformation on the grouped data stream grouped by the
-	 * given key position. The {@link ReduceFunction} will receive input values
-	 * based on the key value. Only input values with the same key will go to
-	 * the same reducer.The user can also extend {@link RichReduceFunction} to
-	 * gain access to other features provided by the {@link RichFuntion}
-	 * interface. Gets the output type.
-	 * 
-	 * @return The output type.
-	 */
-	public TypeInformation<OUT> getOutputType() {
-		return dataStream.getOutputType();
+	protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
+		super(dataStream);
+		this.keyPosition = dataStream.keyPosition;
 	}
 
 	/**
@@ -79,7 +67,7 @@ public class GroupedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return dataStream.addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+		return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
 				ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
 	}
@@ -129,7 +117,7 @@ public class GroupedDataStream<OUT> {
 	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
 			long batchSize, long slideSize) {
 
-		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
 				GroupReduceFunction.class, 1), new BatchGroupReduceInvokable<OUT, R>(reducer,
 				batchSize, slideSize, keyPosition));
@@ -204,7 +192,7 @@ public class GroupedDataStream<OUT> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
 			long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
-		return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
 				GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
 				GroupReduceFunction.class, 1), new WindowGroupReduceInvokable<OUT, R>(reducer,
 				windowSize, slideInterval, keyPosition, timestamp));
@@ -219,22 +207,8 @@ public class GroupedDataStream<OUT> {
 	 *            The position in the data point to sum
 	 * @return The transformed DataStream.
 	 */
-	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
-		dataStream.checkFieldRange(positionToSum);
-		return aggregateGroup((AggregationFunction<OUT>) SumAggregationFunction
-				.getSumFunction(positionToSum, dataStream.getClassAtPos(positionToSum)));
-	}
-
-	/**
-	 * Applies an aggregation that sums the grouped data stream at the first
-	 * position, grouped by the given key position. Input values with the same
-	 * key will be summed.
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum() {
-		return sum(0);
+		return super.sum(positionToSum);
 	}
 
 	/**
@@ -247,21 +221,7 @@ public class GroupedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
-		dataStream.checkFieldRange(positionToMin);
-		return aggregateGroup(new MinAggregationFunction<OUT>(positionToMin));
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum of the grouped data stream
-	 * at the first position, grouped by the given key position. Input values
-	 * with the same key will be minimized.
-	 * 
-	 * @param positionToMin
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> min() {
-		return min(0);
+		return super.min(positionToMin);
 	}
 
 	/**
@@ -274,24 +234,29 @@ public class GroupedDataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
-		dataStream.checkFieldRange(positionToMax);
-		return aggregateGroup(new MaxAggregationFunction<OUT>(positionToMax));
+		return super.max(positionToMax);
 	}
 
-	/**
-	 * Applies an aggregation that gives the maximum of the grouped data stream
-	 * at the first position, grouped by the given key position. Input values
-	 * with the same key will be maximized.
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max() {
-		return max(0);
+	@Override
+	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
+
+		GroupReduceInvokable<OUT> invokable = new GroupReduceInvokable<OUT>(aggregate, keyPosition);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
+				null, null, invokable);
+
+		this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
+		return returnStream;
+	}
+
+	@Override
+	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+		System.out.println("Setting the partitioning after groupBy can affect the grouping");
+		return super.setConnectionType(partitioner);
 	}
 
-	private SingleOutputStreamOperator<OUT, ?> aggregateGroup(
-			AggregationFunction<OUT> aggregate) {
-		return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate,
-				keyPosition), "groupReduce");
+	@Override
+	protected GroupedDataStream<OUT> copy() {
+		return new GroupedDataStream<OUT>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 573dffd..1450ba6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -89,16 +89,10 @@ public class IterativeDataStream<IN> extends
 
 		List<String> name = Arrays.asList(new String[] { iterationName });
 
-		if (iterationTail instanceof MergedDataStream) {
-			for (DataStream<IN> stream : ((MergedDataStream<IN>) iterationTail).mergedStreams) {
-				String inputID = stream.getId();
-				jobGraphBuilder.setEdge(inputID, returnStream.getId(),
-						new ForwardPartitioner<IN>(), 0, name, false);
-			}
-		} else {
-
-			jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
-					new ForwardPartitioner<IN>(), 0, name, false);
+		for (DataStream<IN> stream : iterationTail.mergedStreams) {
+			String inputID = stream.getId();
+			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<IN>(), 0,
+					name, false);
 		}
 
 		return iterationTail;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
deleted file mode 100755
index c1618b2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-
-/**
- * The MergedDataStream represents a DataStream which consists of merged outputs
- * of DataStreams of the same type. Operators applied on this will transform all
- * the merged outputs jointly.
- *
- * @param <OUT>
- *            Type of the output.
- */
-public class MergedDataStream<OUT> extends DataStream<OUT> {
-
-	protected List<DataStream<OUT>> mergedStreams;
-
-	protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType,
-			TypeSerializerWrapper<OUT> outTypeWrapper) {
-		super(environment, operatorType, outTypeWrapper);
-		this.mergedStreams = new ArrayList<DataStream<OUT>>();
-		this.mergedStreams.add(this);
-	}
-
-	protected MergedDataStream(DataStream<OUT> dataStream) {
-		super(dataStream);
-		mergedStreams = new ArrayList<DataStream<OUT>>();
-		if (dataStream instanceof MergedDataStream) {
-			for (DataStream<OUT> stream : ((MergedDataStream<OUT>) dataStream).mergedStreams) {
-				mergedStreams.add(stream);
-			}
-		} else {
-			this.mergedStreams.add(this);
-		}
-
-	}
-
-	protected void addConnection(DataStream<OUT> stream) {
-		if (stream instanceof MergedDataStream) {
-			MergedDataStream<OUT> mStream = (MergedDataStream<OUT>) stream;
-			for (DataStream<OUT> ds : mStream.mergedStreams) {
-				validateMerge(ds.id);
-				this.mergedStreams.add(ds.copy());
-			}
-		} else {
-			validateMerge(stream.id);
-			this.mergedStreams.add(stream.copy());
-		}
-	}
-
-	private void validateMerge(String id) {
-		for (DataStream<OUT> ds : this.mergedStreams) {
-			if (ds.id.equals(id)) {
-				throw new RuntimeException("A DataStream cannot be merged with itself");
-			}
-		}
-	}
-
-	@Override
-	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
-		MergedDataStream<OUT> returnStream = this.copy();
-
-		for (DataStream<OUT> stream : returnStream.mergedStreams) {
-			stream.partitioner = partitioner;
-		}
-
-		return returnStream;
-	}
-
-	@Override
-	protected MergedDataStream<OUT> copy() {
-		return new MergedDataStream<OUT>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
index 1a88095..bed3613 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 public class SlidingWindowStateIterator<T> implements BatchIterator<T> {
 
+	private static final long serialVersionUID = 1L;
+
 	private CircularFifoBuffer buffer;
 	// private StreamRecord<T> nextElement;