You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:53 UTC
[10/18] git commit: [streaming] DataStream type refactor for easier
future extensions
[streaming] DataStream type refactor for easier future extensions
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9cbd68a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9cbd68a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9cbd68a8
Branch: refs/heads/master
Commit: 9cbd68a8430cb4da4e63275974a3626d596896c2
Parents: 13a9277
Author: gyfora <gy...@gmail.com>
Authored: Sat Sep 6 18:14:07 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 82 +++++++++------
.../api/datastream/GroupedDataStream.java | 105 +++++++------------
.../api/datastream/IterativeDataStream.java | 14 +--
.../api/datastream/MergedDataStream.java | 96 -----------------
.../state/SlidingWindowStateIterator.java | 2 +
5 files changed, 89 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d78ceae..bd5b83c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -81,7 +81,7 @@ import org.apache.flink.types.TypeInformation;
* The type of the DataStream, i.e., the type of the elements of the
* DataStream.
*/
-public abstract class DataStream<OUT> {
+public class DataStream<OUT> {
protected static Integer counter = 0;
protected final StreamExecutionEnvironment environment;
@@ -91,6 +91,7 @@ public abstract class DataStream<OUT> {
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
protected TypeSerializerWrapper<OUT> outTypeWrapper;
+ protected List<DataStream<OUT>> mergedStreams;
protected final JobGraphBuilder jobGraphBuilder;
@@ -120,6 +121,8 @@ public abstract class DataStream<OUT> {
this.selectAll = false;
this.partitioner = new ForwardPartitioner<OUT>();
this.outTypeWrapper = outTypeWrapper;
+ this.mergedStreams = new ArrayList<DataStream<OUT>>();
+ this.mergedStreams.add(this);
}
/**
@@ -137,6 +140,14 @@ public abstract class DataStream<OUT> {
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
this.outTypeWrapper = dataStream.outTypeWrapper;
+ this.mergedStreams = new ArrayList<DataStream<OUT>>();
+ this.mergedStreams.add(this);
+ if (dataStream.mergedStreams.size() > 1) {
+ for (int i = 1; i < dataStream.mergedStreams.size(); i++) {
+ this.mergedStreams.add(new DataStream<OUT>(dataStream.mergedStreams.get(i)));
+ }
+ }
+
}
/**
@@ -218,15 +229,26 @@ public abstract class DataStream<OUT> {
* The DataStreams to merge output with.
* @return The {@link MergedDataStream}.
*/
- public MergedDataStream<OUT> merge(DataStream<OUT>... streams) {
- MergedDataStream<OUT> returnStream = new MergedDataStream<OUT>(this);
+ public DataStream<OUT> merge(DataStream<OUT>... streams) {
+ DataStream<OUT> returnStream = this.copy();
for (DataStream<OUT> stream : streams) {
- returnStream.addConnection(stream);
+ for (DataStream<OUT> ds : stream.mergedStreams) {
+ validateMerge(ds.getId());
+ returnStream.mergedStreams.add(ds.copy());
+ }
}
return returnStream;
}
+ private void validateMerge(String id) {
+ for (DataStream<OUT> ds : this.mergedStreams) {
+ if (ds.getId().equals(id)) {
+ throw new RuntimeException("A DataStream cannot be merged with itself");
+ }
+ }
+ }
+
/**
* Creates a new {@link ConnectedDataStream} by connecting
* {@link DataStream} outputs of different type with each other. The
@@ -529,12 +551,12 @@ public abstract class DataStream<OUT> {
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
checkFieldRange(positionToSum);
- return aggregateAll((AggregationFunction<OUT>) SumAggregationFunction
- .getSumFunction(positionToSum, getClassAtPos(positionToSum)));
+ return aggregate((AggregationFunction<OUT>) SumAggregationFunction.getSumFunction(
+ positionToSum, getClassAtPos(positionToSum)));
}
/**
- * Applies an aggregation that sums the data stream at the first position .
+ * Syntactic sugar for sum(0)
*
* @return The transformed DataStream.
*/
@@ -552,12 +574,11 @@ public abstract class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
checkFieldRange(positionToMin);
- return aggregateAll(new MinAggregationFunction<OUT>(positionToMin));
+ return aggregate(new MinAggregationFunction<OUT>(positionToMin));
}
/**
- * Applies an aggregation that that gives the minimum of the data stream at
- * the first position.
+ * Syntactic sugar for min(0)
*
* @return The transformed DataStream.
*/
@@ -575,12 +596,11 @@ public abstract class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
checkFieldRange(positionToMax);
- return aggregateAll(new MaxAggregationFunction<OUT>(positionToMax));
+ return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
}
/**
- * Applies an aggregation that gives the maximum of the data stream at the
- * first position.
+ * Syntactic sugar for max(0)
*
* @return The transformed DataStream.
*/
@@ -588,20 +608,14 @@ public abstract class DataStream<OUT> {
return max(0);
}
- private SingleOutputStreamOperator<OUT, ?> aggregateAll(
- AggregationFunction<OUT> aggregate) {
- return aggregate(aggregate, new StreamReduceInvokable<OUT>(aggregate), "reduce");
- }
+ protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
- SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate,
- StreamReduceInvokable<OUT> invokable, String functionName) {
- DataStream<OUT> inputStream = this.copy();
+ StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
- SingleOutputStreamOperator<OUT, ?> returnStream = inputStream.addFunction(functionName,
- aggregate, null, null, invokable);
-
- this.jobGraphBuilder.setTypeWrappersFrom(inputStream.getId(), returnStream.getId());
+ SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, null,
+ null, invokable);
+ this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
return returnStream;
}
@@ -1032,7 +1046,9 @@ public abstract class DataStream<OUT> {
protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
DataStream<OUT> returnStream = this.copy();
- returnStream.partitioner = partitioner;
+ for (DataStream<OUT> stream : returnStream.mergedStreams) {
+ stream.partitioner = partitioner;
+ }
return returnStream;
}
@@ -1051,14 +1067,9 @@ public abstract class DataStream<OUT> {
* Number of the type (used at co-functions)
*/
protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
- if (inputStream instanceof MergedDataStream) {
- for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
- jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
- inputStream.userDefinedNames, inputStream.selectAll);
- }
- } else {
- jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
- typeNumber, inputStream.userDefinedNames, inputStream.selectAll);
+ for (DataStream<X> stream : inputStream.mergedStreams) {
+ jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
+ inputStream.userDefinedNames, inputStream.selectAll);
}
}
@@ -1104,5 +1115,8 @@ public abstract class DataStream<OUT> {
*
* @return The copy
*/
- protected abstract DataStream<OUT> copy();
+ protected DataStream<OUT> copy(){
+ return new DataStream<OUT>(this);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 06bec0a..c5b010d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -22,47 +22,35 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
import org.apache.flink.streaming.api.invokable.util.Timestamp;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.types.TypeInformation;
/**
- * A GroupedDataStream represents a data stream which has been partitioned by
- * the given key in the values. Operators like {@link #reduce},
- * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream}.
+ * A GroupedDataStream represents a {@link DataStream} which has been
+ * partitioned by the given key in the values. Operators like {@link #reduce},
+ * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
+ * get additional functionality by the grouping.
*
* @param <OUT>
* The output type of the {@link GroupedDataStream}.
*/
-public class GroupedDataStream<OUT> {
+public class GroupedDataStream<OUT> extends DataStream<OUT> {
- DataStream<OUT> dataStream;
int keyPosition;
protected GroupedDataStream(DataStream<OUT> dataStream, int keyPosition) {
- this.dataStream = dataStream.partitionBy(keyPosition);
+ super(dataStream.partitionBy(keyPosition));
this.keyPosition = keyPosition;
}
- /**
- * Applies a reduce transformation on the grouped data stream grouped by the
- * given key position. The {@link ReduceFunction} will receive input values
- * based on the key value. Only input values with the same key will go to
- * the same reducer.The user can also extend {@link RichReduceFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface. Gets the output type.
- *
- * @return The output type.
- */
- public TypeInformation<OUT> getOutputType() {
- return dataStream.getOutputType();
+ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
+ super(dataStream);
+ this.keyPosition = dataStream.keyPosition;
}
/**
@@ -79,7 +67,7 @@ public class GroupedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return dataStream.addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
}
@@ -129,7 +117,7 @@ public class GroupedDataStream<OUT> {
public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
long batchSize, long slideSize) {
- return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
GroupReduceFunction.class, 1), new BatchGroupReduceInvokable<OUT, R>(reducer,
batchSize, slideSize, keyPosition));
@@ -204,7 +192,7 @@ public class GroupedDataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
long windowSize, long slideInterval, Timestamp<OUT> timestamp) {
- return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
GroupReduceFunction.class, 1), new WindowGroupReduceInvokable<OUT, R>(reducer,
windowSize, slideInterval, keyPosition, timestamp));
@@ -219,22 +207,8 @@ public class GroupedDataStream<OUT> {
* The position in the data point to sum
* @return The transformed DataStream.
*/
- @SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
- dataStream.checkFieldRange(positionToSum);
- return aggregateGroup((AggregationFunction<OUT>) SumAggregationFunction
- .getSumFunction(positionToSum, dataStream.getClassAtPos(positionToSum)));
- }
-
- /**
- * Applies an aggregation that sums the grouped data stream at the first
- * position, grouped by the given key position. Input values with the same
- * key will be summed.
- *
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> sum() {
- return sum(0);
+ return super.sum(positionToSum);
}
/**
@@ -247,21 +221,7 @@ public class GroupedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
- dataStream.checkFieldRange(positionToMin);
- return aggregateGroup(new MinAggregationFunction<OUT>(positionToMin));
- }
-
- /**
- * Applies an aggregation that gives the minimum of the grouped data stream
- * at the first position, grouped by the given key position. Input values
- * with the same key will be minimized.
- *
- * @param positionToMin
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> min() {
- return min(0);
+ return super.min(positionToMin);
}
/**
@@ -274,24 +234,29 @@ public class GroupedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
- dataStream.checkFieldRange(positionToMax);
- return aggregateGroup(new MaxAggregationFunction<OUT>(positionToMax));
+ return super.max(positionToMax);
}
- /**
- * Applies an aggregation that gives the maximum of the grouped data stream
- * at the first position, grouped by the given key position. Input values
- * with the same key will be maximized.
- *
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<OUT, ?> max() {
- return max(0);
+ @Override
+ protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
+
+ GroupReduceInvokable<OUT> invokable = new GroupReduceInvokable<OUT>(aggregate, keyPosition);
+
+ SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
+ null, null, invokable);
+
+ this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
+ return returnStream;
+ }
+
+ @Override
+ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+ System.out.println("Setting the partitioning after groupBy can affect the grouping");
+ return super.setConnectionType(partitioner);
}
- private SingleOutputStreamOperator<OUT, ?> aggregateGroup(
- AggregationFunction<OUT> aggregate) {
- return this.dataStream.aggregate(aggregate, new GroupReduceInvokable<OUT>(aggregate,
- keyPosition), "groupReduce");
+ @Override
+ protected GroupedDataStream<OUT> copy() {
+ return new GroupedDataStream<OUT>(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 573dffd..1450ba6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -89,16 +89,10 @@ public class IterativeDataStream<IN> extends
List<String> name = Arrays.asList(new String[] { iterationName });
- if (iterationTail instanceof MergedDataStream) {
- for (DataStream<IN> stream : ((MergedDataStream<IN>) iterationTail).mergedStreams) {
- String inputID = stream.getId();
- jobGraphBuilder.setEdge(inputID, returnStream.getId(),
- new ForwardPartitioner<IN>(), 0, name, false);
- }
- } else {
-
- jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
- new ForwardPartitioner<IN>(), 0, name, false);
+ for (DataStream<IN> stream : iterationTail.mergedStreams) {
+ String inputID = stream.getId();
+ jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<IN>(), 0,
+ name, false);
}
return iterationTail;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
deleted file mode 100755
index c1618b2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
-
-/**
- * The MergedDataStream represents a DataStream which consists of merged outputs
- * of DataStreams of the same type. Operators applied on this will transform all
- * the merged outputs jointly.
- *
- * @param <OUT>
- * Type of the output.
- */
-public class MergedDataStream<OUT> extends DataStream<OUT> {
-
- protected List<DataStream<OUT>> mergedStreams;
-
- protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType,
- TypeSerializerWrapper<OUT> outTypeWrapper) {
- super(environment, operatorType, outTypeWrapper);
- this.mergedStreams = new ArrayList<DataStream<OUT>>();
- this.mergedStreams.add(this);
- }
-
- protected MergedDataStream(DataStream<OUT> dataStream) {
- super(dataStream);
- mergedStreams = new ArrayList<DataStream<OUT>>();
- if (dataStream instanceof MergedDataStream) {
- for (DataStream<OUT> stream : ((MergedDataStream<OUT>) dataStream).mergedStreams) {
- mergedStreams.add(stream);
- }
- } else {
- this.mergedStreams.add(this);
- }
-
- }
-
- protected void addConnection(DataStream<OUT> stream) {
- if (stream instanceof MergedDataStream) {
- MergedDataStream<OUT> mStream = (MergedDataStream<OUT>) stream;
- for (DataStream<OUT> ds : mStream.mergedStreams) {
- validateMerge(ds.id);
- this.mergedStreams.add(ds.copy());
- }
- } else {
- validateMerge(stream.id);
- this.mergedStreams.add(stream.copy());
- }
- }
-
- private void validateMerge(String id) {
- for (DataStream<OUT> ds : this.mergedStreams) {
- if (ds.id.equals(id)) {
- throw new RuntimeException("A DataStream cannot be merged with itself");
- }
- }
- }
-
- @Override
- protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
- MergedDataStream<OUT> returnStream = this.copy();
-
- for (DataStream<OUT> stream : returnStream.mergedStreams) {
- stream.partitioner = partitioner;
- }
-
- return returnStream;
- }
-
- @Override
- protected MergedDataStream<OUT> copy() {
- return new MergedDataStream<OUT>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9cbd68a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
index 1a88095..bed3613 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/SlidingWindowStateIterator.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class SlidingWindowStateIterator<T> implements BatchIterator<T> {
+ private static final long serialVersionUID = 1L;
+
private CircularFifoBuffer buffer;
// private StreamRecord<T> nextElement;