You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/23 22:41:37 UTC
flink git commit: [FLINK-2007] [streaming] Proper Delta policy
serialization
Repository: flink
Updated Branches:
refs/heads/master 24c4f8323 -> 7e4c56258
[FLINK-2007] [streaming] Proper Delta policy serialization
Closes #697
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e4c5625
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e4c5625
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e4c5625
Branch: refs/heads/master
Commit: 7e4c562585af700a77df3d621eb0e8b2617821a6
Parents: 24c4f83
Author: mbalassi <mb...@apache.org>
Authored: Tue May 19 15:51:34 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat May 23 22:40:22 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 2 +
.../api/datastream/WindowedDataStream.java | 1 +
.../streaming/api/windowing/helper/Count.java | 2 +-
.../streaming/api/windowing/helper/Delta.java | 43 ++++++++++------
.../api/windowing/helper/FullStream.java | 2 +-
.../streaming/api/windowing/helper/Time.java | 2 +-
.../api/windowing/helper/WindowingHelper.java | 28 ++++++++--
.../api/windowing/policy/DeltaPolicy.java | 54 ++++++++++++++++----
.../api/complex/ComplexIntegrationTest.java | 15 ++++--
.../api/windowing/policy/DeltaPolicyTest.java | 51 +++++++++---------
10 files changed, 140 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index dbb9b05..5165ec7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -941,6 +941,7 @@ public class DataStream<OUT> {
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public WindowedDataStream<OUT> window(WindowingHelper policyHelper) {
+ policyHelper.setExecutionConfig(getExecutionConfig());
return new WindowedDataStream<OUT>(this, policyHelper);
}
@@ -972,6 +973,7 @@ public class DataStream<OUT> {
*/
@SuppressWarnings("rawtypes")
public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+ policyHelper.setExecutionConfig(getExecutionConfig());
return window(FullStream.window()).every(policyHelper);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index fd11d94..a10c79e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -149,6 +149,7 @@ public class WindowedDataStream<OUT> {
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
+ policyHelper.setExecutionConfig(getExecutionConfig());
WindowedDataStream<OUT> ret = this.copy();
if (ret.evictionHelper == null) {
ret.evictionHelper = ret.triggerHelper;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index 840546f..3266a24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
* {@link Count#of(int)} to get an instance.
*/
@SuppressWarnings("rawtypes")
-public class Count implements WindowingHelper {
+public class Count extends WindowingHelper {
private int count;
private int deleteOnEviction = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 5434a4e..bcb548f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -17,6 +17,9 @@
package org.apache.flink.streaming.api.windowing.helper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -30,23 +33,23 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
* the data type handled by the delta function represented by this
* helper.
*/
-public class Delta<DATA> implements WindowingHelper<DATA> {
+public class Delta<DATA> extends WindowingHelper<DATA> {
private DeltaFunction<DATA> deltaFunction;
private DATA initVal;
private double threshold;
+ private TypeSerializer<DATA> typeSerializer;
/**
* Creates a delta helper representing a delta count or eviction policy
- *
* @param deltaFunction
- * The delta function which should be used to calculate the delta
- * between points.
+ * The delta function which should be used to calculate the delta
+ * points.
* @param initVal
- * The initial value which will be used to calculate the first
- * delta.
+ * The initial value which will be used to calculate the first
+ * delta.
* @param threshold
- * The threshold used by the delta function.
+ * The threshold used by the delta function.
*/
public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
this.deltaFunction = deltaFunction;
@@ -56,12 +59,14 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
@Override
public EvictionPolicy<DATA> toEvict() {
- return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+ instantiateTypeSerializer();
+ return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
}
@Override
public TriggerPolicy<DATA> toTrigger() {
- return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+ instantiateTypeSerializer();
+ return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
}
/**
@@ -73,19 +78,27 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
* buffer and removes all elements from the buffer which have a higher delta
* then the threshold. As soon as there is an element with a lower delta,
* the eviction stops.
- *
+ *
* @param deltaFunction
- * The delta function which should be used to calculate the delta
- * between points.
+ * The delta function which should be used to calculate the delta
+ * points.
* @param initVal
- * The initial value which will be used to calculate the first
- * delta.
+ * The initial value which will be used to calculate the first
+ * delta.
* @param threshold
- * The threshold used by the delta function.
+ * The threshold used by the delta function.
* @return Helper representing a delta trigger or eviction policy
*/
public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
DATA initVal) {
return new Delta<DATA>(deltaFunction, initVal, threshold);
}
+
+ private void instantiateTypeSerializer(){
+ if (executionConfig == null){
+ throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");
+ }
+ TypeInformation typeInformation = TypeExtractor.getForObject(initVal);
+ typeSerializer = typeInformation.createSerializer(executionConfig);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
index 3508b26..7773d9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
* policy and only with operations that support pre-aggregator such as reduce or
* aggregations.
*/
-public class FullStream<DATA> implements WindowingHelper<DATA>, Serializable {
+public class FullStream<DATA> extends WindowingHelper<DATA> implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index da8d929..0089d26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
* The data type which is handled by the time stamp used in the
* policy represented by this helper
*/
-public class Time<DATA> implements WindowingHelper<DATA> {
+public class Time<DATA> extends WindowingHelper<DATA> {
protected long length;
protected TimeUnit granularity;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
index 9df8432..17e142a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.windowing.helper;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
@@ -30,10 +31,31 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
* @see Time
* @see Delta
*/
-public interface WindowingHelper<DATA> {
+public abstract class WindowingHelper<DATA> {
- public EvictionPolicy<DATA> toEvict();
+ /**
+ * Provides information for initial value serialization
+ * in {@link Delta}, unused in other subclasses.
+ */
+ protected ExecutionConfig executionConfig;
- public TriggerPolicy<DATA> toTrigger();
+ /**
+ * Method for encapsulating the {@link EvictionPolicy}.
+ * @return the eviction policy
+ */
+ public abstract EvictionPolicy<DATA> toEvict();
+ /**
+ * Method for encapsulating the {@link TriggerPolicy}.
+ * @return the trigger policy
+ */
+ public abstract TriggerPolicy<DATA> toTrigger();
+
+ /**
+ * Setter for the {@link ExecutionConfig} field.
+ * @param executionConfig Desired value
+ */
+ public final void setExecutionConfig(ExecutionConfig executionConfig){
+ this.executionConfig = executionConfig;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 93fc636..69dd66f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -17,9 +17,15 @@
package org.apache.flink.streaming.api.windowing.policy;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.LinkedList;
import java.util.List;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
/**
@@ -47,34 +53,45 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
*/
private static final long serialVersionUID = -7797538922123394967L;
+ //Used for serializing the threshold
+ private final static int INITIAL_SERIALIZER_BYTES = 1024;
+
protected DeltaFunction<DATA> deltaFuntion;
private List<DATA> windowBuffer;
protected double threshold;
- protected DATA triggerDataPoint;
+ private TypeSerializer<DATA> typeSerializer;
+ protected transient DATA triggerDataPoint;
/**
- * Crates a delta policy which calculates a delta between the data point
+ * Creates a delta policy which calculates a delta between the data point
* which triggered last and the currently arrived data point. It triggers if
- * the delta is higher than a specified threshold.
- *
+ * the delta is higher than a specified threshold. As the data may be sent to
+ * the cluster a {@link TypeSerializer} is needed for the initial value.
+ *
+ * <p>
* In case it gets used for eviction, this policy starts from the first
* element of the buffer and removes all elements from the buffer which have
* a higher delta then the threshold. As soon as there is an element with a
* lower delta, the eviction stops.
- *
+ * </p>
+ *
* @param deltaFuntion
- * The delta function to be used.
+ * The delta function to be used.
* @param init
- * The initial to be used for the calculation of a delta before
- * the first trigger.
+ * The initial to be used for the calculation of a delta before
+ * the first trigger.
* @param threshold
- * The threshold upon which a triggering should happen.
+ * The threshold upon which a triggering should happen.
+ * @param typeSerializer
+ * TypeSerializer to properly forward the initial value to
+ * the cluster
*/
- public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold) {
+ public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
this.deltaFuntion = deltaFuntion;
this.triggerDataPoint = init;
this.windowBuffer = new LinkedList<DATA>();
this.threshold = threshold;
+ this.typeSerializer = typeSerializer;
}
@Override
@@ -107,7 +124,7 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
@Override
public DeltaPolicy<DATA> clone() {
- return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold);
+ return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold, typeSerializer);
}
@Override
@@ -131,4 +148,19 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
public String toString() {
return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
}
+
+ private void writeObject(ObjectOutputStream stream) throws IOException{
+ stream.defaultWriteObject();
+ DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_SERIALIZER_BYTES);
+ typeSerializer.serialize(triggerDataPoint, dataOutputSerializer);
+ stream.write(dataOutputSerializer.getByteArray());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
+ stream.defaultReadObject();
+ byte[] bytes = new byte[stream.available()];
+ stream.readFully(bytes);
+ triggerDataPoint = typeSerializer.deserialize(new DataInputDeserializer(bytes, 0, bytes.length));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 7eb263a..738654a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,6 +36,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -611,10 +613,15 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
}
- private static class RectangleSource implements SourceFunction<RectangleClass> {
+ private static class RectangleSource extends RichSourceFunction<RectangleClass> {
private static final long serialVersionUID = 1L;
- RectangleClass rectangle = new RectangleClass(100, 100);
- int cnt = 0;
+ private transient RectangleClass rectangle;
+ private transient int cnt;
+
+ public void open(Configuration parameters) throws Exception {
+ rectangle = new RectangleClass(100, 100);
+ cnt = 0;
+ }
@Override
public boolean reachedEnd() throws Exception {
@@ -764,7 +771,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
}
- public static class RectangleClass implements Serializable {
+ public static class RectangleClass {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4c5625/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
index 52a4d13..448377d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
@@ -1,25 +1,25 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.flink.streaming.api.windowing.policy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
import org.junit.Test;
import java.util.List;
@@ -29,6 +29,9 @@ import static org.junit.Assert.*;
public class DeltaPolicyTest {
+ //Dummy serializer, this is not used because the tests are done locally
+ private final static TypeSerializer<Tuple2<Integer, Integer>> SERIALIZER = null;
+
@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
@Test
public void testDelta() {
@@ -38,7 +41,7 @@ public class DeltaPolicyTest {
Tuple2<Integer, Integer> newDataPoint) {
return (double) newDataPoint.f0 - oldDataPoint.f0;
}
- }, new Tuple2(0, 0), 2);
+ }, new Tuple2(0, 0), 2, SERIALIZER);
List<Tuple2> tuples = Arrays.asList(new Tuple2(1, 0), new Tuple2(2, 0), new Tuple2(3, 0),
new Tuple2(6, 0));
@@ -70,16 +73,16 @@ public class DeltaPolicyTest {
};
assertEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
- 0), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 0), 2));
+ 0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
+ 0, 0), 2, SERIALIZER));
assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 1), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
- new Tuple2<Integer, Integer>(0, 0), 2));
-
+ 0, 1), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df,
+ new Tuple2<Integer, Integer>(0, 0), 2, SERIALIZER));
+
assertNotEquals(new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(0,
- 0), 2), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
- 0, 0), 3));
+ 0), 2, SERIALIZER), new DeltaPolicy<Tuple2<Integer, Integer>>(df, new Tuple2<Integer, Integer>(
+ 0, 0), 3, SERIALIZER));
}
}
\ No newline at end of file