You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/21 15:54:17 UTC
[4/6] flink git commit: [FLINK-4640] [streaming api] Ensure that the
state descriptors properly initialize the serializers.
[FLINK-4640] [streaming api] Ensure that the state descriptors properly initialize the serializers.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d4eb64b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d4eb64b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d4eb64b
Branch: refs/heads/master
Commit: 4d4eb64be7490672771243147824a70d3d47c501
Parents: 82ef021
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 20:49:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 4 +-
.../api/datastream/WindowedStream.java | 4 +-
.../operators/windowing/WindowOperator.java | 43 ++--
.../operators/StateDescriptorPassingTest.java | 214 +++++++++++++++++++
.../operators/windowing/WindowOperatorTest.java | 2 +
5 files changed, 241 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4b083c8..6b09f3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -469,9 +469,7 @@ public class AllWindowedStream<T, W extends Window> {
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
- initialValue,
- foldFunction,
- resultType);
+ initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e81d7af..ae98619 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -489,9 +489,7 @@ public class WindowedStream<T, K, W extends Window> {
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
- initialValue,
- foldFunction,
- resultType);
+ initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 25ec519..dffa2a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -61,7 +61,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -74,7 +73,8 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An operator that implements the logic for windowing based on a {@link WindowAssigner} and
@@ -186,26 +186,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/**
* Creates a new {@code WindowOperator} based on the given policies and user functions.
*/
- public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- KeySelector<IN, K> keySelector,
- TypeSerializer<K> keySerializer,
- StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
- InternalWindowFunction<ACC, OUT, K, W> windowFunction,
- Trigger<? super IN, ? super W> trigger,
- long allowedLateness) {
+ public WindowOperator(
+ WindowAssigner<? super IN, W> windowAssigner,
+ TypeSerializer<W> windowSerializer,
+ KeySelector<IN, K> keySelector,
+ TypeSerializer<K> keySerializer,
+ StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
+ InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+ Trigger<? super IN, ? super W> trigger,
+ long allowedLateness) {
super(windowFunction);
- this.windowAssigner = requireNonNull(windowAssigner);
- this.windowSerializer = windowSerializer;
- this.keySelector = requireNonNull(keySelector);
- this.keySerializer = requireNonNull(keySerializer);
+ checkArgument(allowedLateness >= 0);
- this.windowStateDescriptor = windowStateDescriptor;
- this.trigger = requireNonNull(trigger);
+ checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
+ "window state serializer is not properly initialized");
- Preconditions.checkArgument(allowedLateness >= 0);
+ this.windowAssigner = checkNotNull(windowAssigner);
+ this.windowSerializer = checkNotNull(windowSerializer);
+ this.keySelector = checkNotNull(keySelector);
+ this.keySerializer = checkNotNull(keySerializer);
+ this.windowStateDescriptor = windowStateDescriptor;
+ this.trigger = checkNotNull(trigger);
this.allowedLateness = allowedLateness;
setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -666,7 +669,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public <S extends Serializable> ValueState<S> getKeyValueState(String name,
Class<S> stateType,
S defaultState) {
- requireNonNull(stateType, "The state type class must not be null");
+ checkNotNull(stateType, "The state type class must not be null");
TypeInformation<S> typeInfo;
try {
@@ -686,8 +689,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TypeInformation<S> stateType,
S defaultState) {
- requireNonNull(name, "The name of the state must not be null");
- requireNonNull(stateType, "The state type information must not be null");
+ checkNotNull(name, "The name of the state must not be null");
+ checkNotNull(stateType, "The state type information must not be null");
ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), defaultState);
return getPartitionedState(stateDesc);
http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
new file mode 100644
index 0000000..c0ca6a0
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+
+/**
+ * Various tests around the proper passing of state descriptors to the operators
+ * and their serialization.
+ *
+ * The tests use an arbitrary generic type to validate the behavior.
+ */
+@SuppressWarnings("serial")
+public class StateDescriptorPassingTest {
+
+ @Test
+ public void testFoldWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<String> src = env.fromElements("abc");
+
+ SingleOutputStreamOperator<?> result = src
+ .keyBy(new KeySelector<String, String>() {
+ @Override
+ public String getKey(String value) {
+ return null;
+ }
+ })
+ .timeWindow(Time.milliseconds(1000))
+ .fold(new File("/"), new FoldFunction<String, File>() {
+
+ @Override
+ public File fold(File a, String e) {
+ return null;
+ }
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ @Test
+ public void testReduceWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .keyBy(new KeySelector<File, String>() {
+ @Override
+ public String getKey(File value) {
+ return null;
+ }
+ })
+ .timeWindow(Time.milliseconds(1000))
+ .reduce(new ReduceFunction<File>() {
+
+ @Override
+ public File reduce(File value1, File value2) {
+ return null;
+ }
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ @Test
+ public void testApplyWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .keyBy(new KeySelector<File, String>() {
+ @Override
+ public String getKey(File value) {
+ return null;
+ }
+ })
+ .timeWindow(Time.milliseconds(1000))
+ .apply(new WindowFunction<File, String, String, TimeWindow>() {
+ @Override
+ public void apply(String s, TimeWindow window,
+ Iterable<File> input, Collector<String> out) {}
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ @Test
+ public void testFoldWindowAllState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<String> src = env.fromElements("abc");
+
+ SingleOutputStreamOperator<?> result = src
+ .timeWindowAll(Time.milliseconds(1000))
+ .fold(new File("/"), new FoldFunction<String, File>() {
+
+ @Override
+ public File fold(File a, String e) {
+ return null;
+ }
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ @Test
+ public void testReduceWindowAllState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .timeWindowAll(Time.milliseconds(1000))
+ .reduce(new ReduceFunction<File>() {
+
+ @Override
+ public File reduce(File value1, File value2) {
+ return null;
+ }
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ @Test
+ public void testApplyWindowAllState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .timeWindowAll(Time.milliseconds(1000))
+ .apply(new AllWindowFunction<File, String, TimeWindow>() {
+ @Override
+ public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
+ });
+
+ validateStateDescriptorConfigured(result);
+ }
+
+ // ------------------------------------------------------------------------
+ // generic validation
+ // ------------------------------------------------------------------------
+
+ private void validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+ OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation();
+ WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator();
+ StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+ // this would be the first statement to fail if state descriptors were not properly initialized
+ TypeSerializer<?> serializer = descr.getSerializer();
+ assertTrue(serializer instanceof KryoSerializer);
+
+ Kryo kryo = ((KryoSerializer<?>) serializer).getKryo();
+
+ assertTrue("serializer registration was not properly passed on",
+ kryo.getSerializer(File.class) instanceof JavaSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 67a6f55..fd73bcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -2102,6 +2102,7 @@ public class WindowOperatorTest extends TestLogger {
}
},
inputType);
+ windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
new WindowOperator<>(
@@ -2246,6 +2247,7 @@ public class WindowOperatorTest extends TestLogger {
}
},
inputType);
+ windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
new WindowOperator<>(