You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:12 UTC
[5/9] flink git commit: [FLINK-5590] [runtime] Add proper internal
state hierarchy
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 628d663..9b8af58 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.commons.math3.util.ArithmeticUtils;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
@@ -42,6 +42,9 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAppendingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -107,22 +110,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected final WindowAssigner<? super IN, W> windowAssigner;
- protected final KeySelector<IN, K> keySelector;
-
- protected final Trigger<? super IN, ? super W> trigger;
+ private final KeySelector<IN, K> keySelector;
- protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
+ private final Trigger<? super IN, ? super W> trigger;
- protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
+ private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
- /**
- * For serializing the key in checkpoints.
- */
+ /** For serializing the key in checkpoints. */
protected final TypeSerializer<K> keySerializer;
- /**
- * For serializing the window in checkpoints.
- */
+ /** For serializing the window in checkpoints. */
protected final TypeSerializer<W> windowSerializer;
/**
@@ -133,15 +130,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@code window.maxTimestamp + allowedLateness} landmark.
* </ul>
*/
- protected final long allowedLateness;
+ private final long allowedLateness;
// ------------------------------------------------------------------------
// State that is not checkpointed
// ------------------------------------------------------------------------
- /**
- * This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp.
- */
+ /** The state in which the window contents is stored. Each window is a namespace */
+ private transient InternalAppendingState<W, IN, ACC> windowState;
+
+ /** The {@link #windowState}, typed to merging state for merging windows.
+ * Null if the window state is not mergeable */
+ private transient InternalMergingState<W, IN, ACC> windowMergingState;
+
+ /** The state that holds the merging window metadata (the sets that describe what is merged) */
+ private transient InternalListState<VoidNamespace, Tuple2<W, W>> mergingSetsState;
+
+ /** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
protected transient TimestampedCollector<OUT> timestampedCollector;
protected transient Context context = new Context(null, null);
@@ -234,14 +239,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.allowedLateness = allowedLateness;
this.legacyWindowOperatorType = legacyWindowOperatorType;
- if (windowAssigner instanceof MergingWindowAssigner) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
- mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
- } else {
- mergingWindowsDescriptor = null;
- }
-
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -263,6 +260,43 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
};
+ // create (or restore) the state that hold the actual window contents
+ // NOTE - the state may be null in the case of the overriding evicting window operator
+ if (windowStateDescriptor != null) {
+ windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
+ }
+
+ // create the typed and helper states for merging windows
+ if (windowAssigner instanceof MergingWindowAssigner) {
+
+ // store a typed reference for the state of merging windows - sanity check
+ if (windowState instanceof InternalMergingState) {
+ windowMergingState = (InternalMergingState<W, IN, ACC>) windowState;
+ }
+ // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
+ // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
+ // TODO activate the sanity check once resolved
+// else if (windowState != null) {
+// throw new IllegalStateException(
+// "The window uses a merging assigner, but the window state is not mergeable.");
+// }
+
+ @SuppressWarnings("unchecked")
+ final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;
+
+ final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
+ typedTuple,
+ new TypeSerializer[] {windowSerializer, windowSerializer} );
+
+ final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
+ new ListStateDescriptor<>("merging-window-set", tupleSerializer);
+
+ // get the state that stores the merging sets
+ mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>)
+ getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
+ mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ }
+
registerRestoredLegacyStateState();
}
@@ -283,12 +317,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
- @SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
- Collection<W> elementWindows = windowAssigner.assignWindows(
+ final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
-
- final K key = (K) getKeyedStateBackend().getCurrentKey();
+
+ final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
@@ -315,11 +348,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
// merge the merged state windows into the newly resulting state window
- getKeyedStateBackend().mergePartitionedStates(
- stateWindowResult,
- mergedStateWindows,
- windowSerializer,
- (StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
+ windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
@@ -334,8 +363,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
- AppendingState<IN, ACC> windowState =
- getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
context.key = key;
@@ -368,8 +396,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
continue;
}
- AppendingState<IN, ACC> windowState =
- getPartitionedState(window, windowSerializer, windowStateDescriptor);
+ windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
context.key = key;
@@ -399,8 +426,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.getKey();
context.window = timer.getNamespace();
- AppendingState<IN, ACC> windowState;
- MergingWindowSet<W> mergingWindows = null;
+ MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
@@ -411,12 +437,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// so it is safe to just ignore
return;
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+
+ windowState.setCurrentNamespace(stateWindow);
} else {
- windowState = getPartitionedState(
- context.window,
- windowSerializer,
- windowStateDescriptor);
+ windowState.setCurrentNamespace(context.window);
+ mergingWindows = null;
}
ACC contents = windowState.get();
@@ -440,8 +465,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context.key = timer.getKey();
context.window = timer.getNamespace();
- AppendingState<IN, ACC> windowState;
- MergingWindowSet<W> mergingWindows = null;
+ MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
@@ -452,9 +476,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// so it is safe to just ignore
return;
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ windowState.setCurrentNamespace(stateWindow);
} else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ windowState.setCurrentNamespace(context.window);
+ mergingWindows = null;
}
ACC contents = windowState.get();
@@ -507,13 +532,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@link MergingWindowSet#persist()}.
*/
protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
- ListState<Tuple2<W, W>> mergeState =
- getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergingWindowsDescriptor);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- MergingWindowSet<W> mergingWindows = new MergingWindowSet<>((MergingWindowAssigner) windowAssigner, mergeState);
-
- return mergingWindows;
+ @SuppressWarnings("unchecked")
+ MergingWindowAssigner<? super IN, W> mergingAssigner = (MergingWindowAssigner<? super IN, W>) windowAssigner;
+ return new MergingWindowSet<>(mergingAssigner, mergingSetsState);
}
/**
@@ -655,11 +676,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
if (mergedWindows != null && mergedWindows.size() > 0) {
try {
- WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(window,
- mergedWindows,
- windowSerializer,
- stateDescriptor);
- } catch (Exception e) {
+ S rawState = getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor);
+
+ if (rawState instanceof InternalMergingState) {
+ @SuppressWarnings("unchecked")
+ InternalMergingState<W, ?, ?> mergingState = (InternalMergingState<W, ?, ?>) rawState;
+ mergingState.mergeNamespaces(window, mergedWindows);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "The given state descriptor does not refer to a mergeable state (MergingState)");
+ }
+ }
+ catch (Exception e) {
throw new RuntimeException("Error while merging state.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 0e2d1e8..2faa506 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -72,6 +72,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -83,6 +84,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("serial")
public class WindowOperatorTest extends TestLogger {
// For counting if close() is called the correct number of times on the SumReducer
@@ -758,7 +760,7 @@ public class WindowOperatorTest extends TestLogger {
0);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -846,7 +848,7 @@ public class WindowOperatorTest extends TestLogger {
0);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -1124,7 +1126,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1184,7 +1186,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1250,7 +1252,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1310,7 +1312,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1385,7 +1387,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1475,7 +1477,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1559,7 +1561,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1643,7 +1645,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1736,7 +1738,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1821,7 +1823,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1902,11 +1904,11 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction2()),
- new EventTimeTriggerAccumGC(LATENESS),
+ new EventTimeTriggerAccumGC(LATENESS),
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -1929,7 +1931,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.close();
}
- private class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
+ private static class PassThroughFunction2 implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
@@ -1960,7 +1962,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2006,7 +2008,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2063,7 +2065,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2108,7 +2110,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2152,7 +2154,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2172,6 +2174,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.close();
}
+ // TODO this test seems invalid, as it uses the unsupported combination of merging windows and folding window state
@Test
public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception {
final int GAP_SIZE = 3;
@@ -2206,7 +2209,7 @@ public class WindowOperatorTest extends TestLogger {
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ;
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -2230,7 +2233,7 @@ public class WindowOperatorTest extends TestLogger {
// UDFs
// ------------------------------------------------------------------------
- private class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
+ private static class PassThroughFunction implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
@@ -2289,7 +2292,7 @@ public class WindowOperatorTest extends TestLogger {
}
@SuppressWarnings("unchecked")
- private static class Tuple2ResultSortComparator implements Comparator<Object> {
+ private static class Tuple2ResultSortComparator implements Comparator<Object>, Serializable {
@Override
public int compare(Object o1, Object o2) {
if (o1 instanceof Watermark || o2 instanceof Watermark) {
@@ -2311,7 +2314,7 @@ public class WindowOperatorTest extends TestLogger {
}
@SuppressWarnings("unchecked")
- private static class Tuple3ResultSortComparator implements Comparator<Object> {
+ private static class Tuple3ResultSortComparator implements Comparator<Object>, Serializable {
@Override
public int compare(Object o1, Object o2) {
if (o1 instanceof Watermark || o2 instanceof Watermark) {
@@ -2403,15 +2406,11 @@ public class WindowOperatorTest extends TestLogger {
* purge the state of the fired window. This is to test the state
* garbage collection mechanism.
*/
- public class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
+ public static class EventTimeTriggerAccumGC extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private long cleanupTime;
- private EventTimeTriggerAccumGC() {
- cleanupTime = 0L;
- }
-
public EventTimeTriggerAccumGC(long cleanupTime) {
this.cleanupTime = cleanupTime;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 0e1aca0..0562443 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,9 +18,7 @@
package org.apache.flink.test.query;
-
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +28,14 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
@@ -59,14 +61,18 @@ public final class KVStateRequestSerializerRocksDBTest {
*/
final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
- RocksDBKeyedStateBackend2(final JobID jobId,
- final String operatorIdentifier,
- final ClassLoader userCodeClassLoader,
- final File instanceBasePath, final DBOptions dbOptions,
- final ColumnFamilyOptions columnFamilyOptions,
- final TaskKvStateRegistry kvStateRegistry,
- final TypeSerializer<K> keySerializer, final int numberOfKeyGroups,
- final KeyGroupRange keyGroupRange) throws Exception {
+ RocksDBKeyedStateBackend2(
+ final JobID jobId,
+ final String operatorIdentifier,
+ final ClassLoader userCodeClassLoader,
+ final File instanceBasePath,
+ final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions,
+ final TaskKvStateRegistry kvStateRegistry,
+ final TypeSerializer<K> keySerializer,
+ final int numberOfKeyGroups,
+ final KeyGroupRange keyGroupRange) throws Exception {
+
super(jobId, operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
@@ -74,9 +80,10 @@ public final class KVStateRequestSerializerRocksDBTest {
}
@Override
- public <N, T> ListState<T> createListState(
+ public <N, T> InternalListState<N, T> createListState(
final TypeSerializer<N> namespaceSerializer,
final ListStateDescriptor<T> stateDesc) throws Exception {
+
return super.createListState(namespaceSerializer, stateDesc);
}
}
@@ -90,8 +97,7 @@ public final class KVStateRequestSerializerRocksDBTest {
*/
@Test
public void testListSerialization() throws Exception {
- final long key = 0l;
- TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+ final long key = 0L;
// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
@@ -110,9 +116,10 @@ public final class KVStateRequestSerializerRocksDBTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final ListState<Long> listState = longHeapKeyedStateBackend
+ final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
KvStateRequestSerializerTest.testListSerialization(key, listState);
}
}