You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/21 15:54:17 UTC

[4/6] flink git commit: [FLINK-4640] [streaming api] Ensure that the state descriptors properly initialize the serializers.

[FLINK-4640] [streaming api] Ensure that the state descriptors properly initialize the serializers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d4eb64b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d4eb64b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d4eb64b

Branch: refs/heads/master
Commit: 4d4eb64be7490672771243147824a70d3d47c501
Parents: 82ef021
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 20:49:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |   4 +-
 .../api/datastream/WindowedStream.java          |   4 +-
 .../operators/windowing/WindowOperator.java     |  43 ++--
 .../operators/StateDescriptorPassingTest.java   | 214 +++++++++++++++++++
 .../operators/windowing/WindowOperatorTest.java |   2 +
 5 files changed, 241 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4b083c8..6b09f3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -469,9 +469,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
-					initialValue,
-					foldFunction,
-					resultType);
+					initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e81d7af..ae98619 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -489,9 +489,7 @@ public class WindowedStream<T, K, W extends Window> {
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
-				initialValue,
-				foldFunction,
-				resultType);
+				initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 25ec519..dffa2a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -61,7 +61,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -74,7 +73,8 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
@@ -186,26 +186,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	/**
 	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 	 */
-	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-		TypeSerializer<W> windowSerializer,
-		KeySelector<IN, K> keySelector,
-		TypeSerializer<K> keySerializer,
-		StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
-		InternalWindowFunction<ACC, OUT, K, W> windowFunction,
-		Trigger<? super IN, ? super W> trigger,
-		long allowedLateness) {
+	public WindowOperator(
+			WindowAssigner<? super IN, W> windowAssigner,
+			TypeSerializer<W> windowSerializer,
+			KeySelector<IN, K> keySelector,
+			TypeSerializer<K> keySerializer,
+			StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
+			InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			long allowedLateness) {
 
 		super(windowFunction);
 
-		this.windowAssigner = requireNonNull(windowAssigner);
-		this.windowSerializer = windowSerializer;
-		this.keySelector = requireNonNull(keySelector);
-		this.keySerializer = requireNonNull(keySerializer);
+		checkArgument(allowedLateness >= 0);
 
-		this.windowStateDescriptor = windowStateDescriptor;
-		this.trigger = requireNonNull(trigger);
+		checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
+				"window state serializer is not properly initialized");
 
-		Preconditions.checkArgument(allowedLateness >= 0);
+		this.windowAssigner = checkNotNull(windowAssigner);
+		this.windowSerializer = checkNotNull(windowSerializer);
+		this.keySelector = checkNotNull(keySelector);
+		this.keySerializer = checkNotNull(keySerializer);
+		this.windowStateDescriptor = windowStateDescriptor;
+		this.trigger = checkNotNull(trigger);
 		this.allowedLateness = allowedLateness;
 
 		setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -666,7 +669,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		public <S extends Serializable> ValueState<S> getKeyValueState(String name,
 			Class<S> stateType,
 			S defaultState) {
-			requireNonNull(stateType, "The state type class must not be null");
+			checkNotNull(stateType, "The state type class must not be null");
 
 			TypeInformation<S> typeInfo;
 			try {
@@ -686,8 +689,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			TypeInformation<S> stateType,
 			S defaultState) {
 
-			requireNonNull(name, "The name of the state must not be null");
-			requireNonNull(stateType, "The state type information must not be null");
+			checkNotNull(name, "The name of the state must not be null");
+			checkNotNull(stateType, "The state type information must not be null");
 
 			ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), defaultState);
 			return getPartitionedState(stateDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
new file mode 100644
index 0000000..c0ca6a0
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+
+/**
+ * Various tests around the proper passing of state descriptors to the operators
+ * and their serialization.
+ * 
+ * The tests use an arbitrary generic type to validate the behavior.
+ */
+@SuppressWarnings("serial")
+public class StateDescriptorPassingTest {
+
+	@Test
+	public void testFoldWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<String> src = env.fromElements("abc");
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<String, String>() {
+					@Override
+					public String getKey(String value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.fold(new File("/"), new FoldFunction<String, File>() {
+
+					@Override
+					public File fold(File a, String e) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testReduceWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.reduce(new ReduceFunction<File>() {
+					
+					@Override
+					public File reduce(File value1, File value2) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testApplyWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+		
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.apply(new WindowFunction<File, String, String, TimeWindow>() {
+					@Override
+					public void apply(String s, TimeWindow window, 
+										Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testFoldWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<String> src = env.fromElements("abc");
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.fold(new File("/"), new FoldFunction<String, File>() {
+
+					@Override
+					public File fold(File a, String e) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testReduceWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.reduce(new ReduceFunction<File>() {
+
+					@Override
+					public File reduce(File value1, File value2) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testApplyWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.apply(new AllWindowFunction<File, String, TimeWindow>() {
+					@Override
+					public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	// ------------------------------------------------------------------------
+	//  generic validation
+	// ------------------------------------------------------------------------
+
+	private void validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+		OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation();
+		WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator();
+		StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+		// this would be the first statement to fail if state descriptors were not properly initialized
+		TypeSerializer<?> serializer = descr.getSerializer();
+		assertTrue(serializer instanceof KryoSerializer);
+
+		Kryo kryo = ((KryoSerializer<?>) serializer).getKryo();
+
+		assertTrue("serializer registration was not properly passed on", 
+				kryo.getSerializer(File.class) instanceof JavaSerializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 67a6f55..fd73bcc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -2102,6 +2102,7 @@ public class WindowOperatorTest extends TestLogger {
 					}
 				},
 				inputType);
+		windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
 			new WindowOperator<>(
@@ -2246,6 +2247,7 @@ public class WindowOperatorTest extends TestLogger {
 					}
 				},
 				inputType);
+		windowStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
 			new WindowOperator<>(