You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/23 22:41:37 UTC

flink git commit: [FLINK-2007] [streaming] Proper Delta policy serialization

Repository: flink
Updated Branches:
  refs/heads/master 24c4f8323 -> 7e4c56258


[FLINK-2007] [streaming] Proper Delta policy serialization

Closes #697


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e4c5625
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e4c5625
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e4c5625

Branch: refs/heads/master
Commit: 7e4c562585af700a77df3d621eb0e8b2617821a6
Parents: 24c4f83
Author: mbalassi <mb...@apache.org>
Authored: Tue May 19 15:51:34 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat May 23 22:40:22 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  2 +
 .../api/datastream/WindowedDataStream.java      |  1 +
 .../streaming/api/windowing/helper/Count.java   |  2 +-
 .../streaming/api/windowing/helper/Delta.java   | 43 ++++++++++------
 .../api/windowing/helper/FullStream.java        |  2 +-
 .../streaming/api/windowing/helper/Time.java    |  2 +-
 .../api/windowing/helper/WindowingHelper.java   | 28 ++++++++--
 .../api/windowing/policy/DeltaPolicy.java       | 54 ++++++++++++++++----
 .../api/complex/ComplexIntegrationTest.java     | 15 ++++--
 .../api/windowing/policy/DeltaPolicyTest.java   | 51 +++++++++---------
 10 files changed, 140 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index dbb9b05..5165ec7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -941,6 +941,7 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public WindowedDataStream<OUT> window(WindowingHelper policyHelper) {
+		policyHelper.setExecutionConfig(getExecutionConfig());
 		return new WindowedDataStream<OUT>(this, policyHelper);
 	}
 
@@ -972,6 +973,7 @@ public class DataStream<OUT> {
 	 */
 	@SuppressWarnings("rawtypes")
 	public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+		policyHelper.setExecutionConfig(getExecutionConfig());
 		return window(FullStream.window()).every(policyHelper);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index fd11d94..a10c79e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -149,6 +149,7 @@ public class WindowedDataStream<OUT> {
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+		policyHelper.setExecutionConfig(getExecutionConfig());
 		WindowedDataStream<OUT> ret = this.copy();
 		if (ret.evictionHelper == null) {
 			ret.evictionHelper = ret.triggerHelper;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index 840546f..3266a24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  * {@link Count#of(int)} to get an instance.
  */
 @SuppressWarnings("rawtypes")
-public class Count implements WindowingHelper {
+public class Count extends WindowingHelper {
 
 	private int count;
 	private int deleteOnEviction = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 5434a4e..bcb548f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.windowing.helper;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -30,23 +33,23 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  *            the data type handled by the delta function represented by this
  *            helper.
  */
-public class Delta<DATA> implements WindowingHelper<DATA> {
+public class Delta<DATA> extends WindowingHelper<DATA> {
 
 	private DeltaFunction<DATA> deltaFunction;
 	private DATA initVal;
 	private double threshold;
+	private TypeSerializer<DATA> typeSerializer;
 
 	/**
 	 * Creates a delta helper representing a delta count or eviction policy
-	 * 
 	 * @param deltaFunction
-	 *            The delta function which should be used to calculate the delta
-	 *            between points.
+	 *				The delta function which should be used to calculate the delta
+	 *				points.
 	 * @param initVal
-	 *            The initial value which will be used to calculate the first
-	 *            delta.
+	 *				The initial value which will be used to calculate the first
+	 *				delta.
 	 * @param threshold
-	 *            The threshold used by the delta function.
+	 * 				The threshold used by the delta function.
 	 */
 	public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
 		this.deltaFunction = deltaFunction;
@@ -56,12 +59,14 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
 
 	@Override
 	public EvictionPolicy<DATA> toEvict() {
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+		instantiateTypeSerializer();
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
 	}
 
 	@Override
 	public TriggerPolicy<DATA> toTrigger() {
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+		instantiateTypeSerializer();
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
 	}
 
 	/**
@@ -73,19 +78,27 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
 	 * buffer and removes all elements from the buffer which have a higher delta
 	 * then the threshold. As soon as there is an element with a lower delta,
 	 * the eviction stops.
-	 * 
+	 *
 	 * @param deltaFunction
-	 *            The delta function which should be used to calculate the delta
-	 *            between points.
+	 *				The delta function which should be used to calculate the delta
+	 *				points.
 	 * @param initVal
-	 *            The initial value which will be used to calculate the first
-	 *            delta.
+	 *				The initial value which will be used to calculate the first
+	 *				delta.
 	 * @param threshold
-	 *            The threshold used by the delta function.
+	 * 				The threshold used by the delta function.
 	 * @return Helper representing a delta trigger or eviction policy
 	 */
 	public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
 			DATA initVal) {
 		return new Delta<DATA>(deltaFunction, initVal, threshold);
 	}
+
+	private void instantiateTypeSerializer(){
+		if (executionConfig == null){
+			throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");
+		}
+		TypeInformation typeInformation = TypeExtractor.getForObject(initVal);
+		typeSerializer = typeInformation.createSerializer(executionConfig);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
index 3508b26..7773d9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  * policy and only with operations that support pre-aggregator such as reduce or
  * aggregations.
  */
-public class FullStream<DATA> implements WindowingHelper<DATA>, Serializable {
+public class FullStream<DATA> extends WindowingHelper<DATA> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index da8d929..0089d26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  *            The data type which is handled by the time stamp used in the
  *            policy represented by this helper
  */
-public class Time<DATA> implements WindowingHelper<DATA> {
+public class Time<DATA> extends WindowingHelper<DATA> {
 
 	protected long length;
 	protected TimeUnit granularity;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
index 9df8432..17e142a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.windowing.helper;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 
@@ -30,10 +31,31 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  * @see Time
  * @see Delta
  */
-public interface WindowingHelper<DATA> {
+public abstract class WindowingHelper<DATA> {
 
-	public EvictionPolicy<DATA> toEvict();
+	/**
+	 * Provides information for initial value serialization
+	 * in {@link Delta}, unused in other subclasses.
+	 */
+	protected ExecutionConfig executionConfig;
 
-	public TriggerPolicy<DATA> toTrigger();
+	/**
+	 * Method for encapsulating the {@link EvictionPolicy}.
+	 * @return the eviction policy
+	 */
+	public abstract EvictionPolicy<DATA> toEvict();
 
+	/**
+	 * Method for encapsulating the {@link TriggerPolicy}.
+	 * @return the trigger policy
+	 */
+	public abstract TriggerPolicy<DATA> toTrigger();
+
+	/**
+	 * Setter for the {@link ExecutionConfig} field.
+	 * @param executionConfig Desired value
+	 */
+	public final void setExecutionConfig(ExecutionConfig executionConfig){
+		this.executionConfig = executionConfig;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 93fc636..69dd66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -17,9 +17,15 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 
 /**
@@ -47,34 +53,45 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
 	 */
 	private static final long serialVersionUID = -7797538922123394967L;
 
+	//Used for serializing the threshold
+	private final static int INITIAL_SERIALIZER_BYTES = 1024;
+
 	protected DeltaFunction<DATA> deltaFuntion;
 	private List<DATA> windowBuffer;
 	protected double threshold;
-	protected DATA triggerDataPoint;
+	private TypeSerializer<DATA> typeSerializer;
+	protected transient DATA triggerDataPoint;
 
 	/**
-	 * Crates a delta policy which calculates a delta between the data point
+	 * Creates a delta policy which calculates a delta between the data point
 	 * which triggered last and the currently arrived data point. It triggers if
-	 * the delta is higher than a specified threshold.
-	 * 
+	 * the delta is higher than a specified threshold. As the data may be sent to
+	 * the cluster a {@link TypeSerializer} is needed for the initial value.
+	 *
+	 * <p>
 	 * In case it gets used for eviction, this policy starts from the first
 	 * element of the buffer and removes all elements from the buffer which have
 	 * a higher delta then the threshold. As soon as there is an element with a
 	 * lower delta, the eviction stops.
-	 * 
+	 * </p>
+	 *
 	 * @param deltaFuntion
-	 *            The delta function to be used.
+	 * 				The delta function to be used.
 	 * @param init
-	 *            The initial to be used for the calculation of a delta before
-	 *            the first trigger.
+	 *				The initial to be used for the calculation of a delta before
+	 *				the first trigger.
 	 * @param threshold
-	 *            The threshold upon which a triggering should happen.
+	 * 				The threshold upon which a triggering should happen.
+	 * @param typeSerializer
+	 * 				TypeSerializer to properly forward the initial value to
+	 * 				the cluster
 	 */
-	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold) {
+	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
 		this.deltaFuntion = deltaFuntion;
 		this.triggerDataPoint = init;
 		this.windowBuffer = new LinkedList<DATA>();
 		this.threshold = threshold;
+		this.typeSerializer = typeSerializer;
 	}
 
 	@Override
@@ -107,7 +124,7 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
 
 	@Override
 	public DeltaPolicy<DATA> clone() {
-		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold);
+		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold, typeSerializer);
 	}
 
 	@Override
@@ -131,4 +148,19 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
 	public String toString() {
 		return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
 	}
+
+	private void writeObject(ObjectOutputStream stream) throws IOException{
+		stream.defaultWriteObject();
+		DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_SERIALIZER_BYTES);
+		typeSerializer.serialize(triggerDataPoint, dataOutputSerializer);
+		stream.write(dataOutputSerializer.getByteArray());
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
+		stream.defaultReadObject();
+		byte[] bytes = new byte[stream.available()];
+		stream.readFully(bytes);
+		triggerDataPoint = typeSerializer.deserialize(new DataInputDeserializer(bytes, 0, bytes.length));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 7eb263a..738654a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -611,10 +613,15 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	private static class RectangleSource implements SourceFunction<RectangleClass> {
+	private static class RectangleSource extends RichSourceFunction<RectangleClass> {
 		private static final long serialVersionUID = 1L;
-		RectangleClass rectangle = new RectangleClass(100, 100);
-		int cnt = 0;
+		private transient RectangleClass rectangle;
+		private transient int cnt;
+
+		public void open(Configuration parameters) throws Exception {
+			rectangle = new RectangleClass(100, 100);
+			cnt = 0;
+		}
 
 		@Override
 		public boolean reachedEnd() throws Exception {
@@ -764,7 +771,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static class RectangleClass implements Serializable {
+	public static class RectangleClass {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
index 52a4d13..448377d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
@@ -1,25 +1,25 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.flink.streaming.api.windowing.policy;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
 import org.junit.Test;
 
 import java.util.List;
@@ -29,6 +29,9 @@ import static org.junit.Assert.*;
 
 public class DeltaPolicyTest {
 
+	//Dummy serializer, this is not used because the tests are done locally
+	private final static TypeSerializer<Tuple2<Integer, Integer>> SERIALIZER = null;
+
 	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
 	@Test
 	public void testDelta() {
@@ -38,7 +41,7 @@ public class DeltaPolicyTest {
 					Tuple2<Integer, Integer> newDataPoint) {
 				return (double) newDataPoint.f0 - oldDataPoint.f0;
 			}
-		}, new Tuple2(0, 0), 2);
+		}, new Tuple2(0, 0), 2, SERIALIZER);
 
 		List<Tuple2> tuples = Arrays.asList(new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0),
 				new Tuple2(6, 0));
@@ -70,16 +73,16 @@ public class DeltaPolicyTest {
 		};
 
 		assertEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
-				0), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 0), 2));
+				0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
+				0, 0), 2, SERIALIZER));
 
 		assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 1), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
-				new Tuple2<Integer, Integer>(0, 0), 2));
-		
+				0, 1), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
+				new Tuple2<Integer, Integer>(0, 0), 2, SERIALIZER));
+
 		assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
-				0), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
-				0, 0), 3));
+				0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
+				0, 0), 3, SERIALIZER));
 
 	}
 }
\ No newline at end of file