You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/21 13:12:33 UTC

flink git commit: [FLINK-4640] [streaming api] Ensure that the state descriptors properly initialize the serializers.

Repository: flink
Updated Branches:
  refs/heads/release-1.1 a7f6594b6 -> 52a4440d9


[FLINK-4640] [streaming api] Ensure that the state descriptors properly initialize the serializers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52a4440d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52a4440d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52a4440d

Branch: refs/heads/release-1.1
Commit: 52a4440d916fb450c4999f6e1f42f392e247b426
Parents: a7f6594
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 20:49:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 15:12:02 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |   4 +-
 .../api/datastream/WindowedStream.java          |   4 +-
 .../operators/StateDescriptorPassingTest.java   | 219 +++++++++++++++++++
 3 files changed, 221 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index fa3b90d..41b131a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -469,9 +469,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
-					initialValue,
-					foldFunction,
-					resultType);
+					initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e81d7af..ae98619 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -489,9 +489,7 @@ public class WindowedStream<T, K, W extends Window> {
 
 		} else {
 			FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
-				initialValue,
-				foldFunction,
-				resultType);
+				initialValue, foldFunction, resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
 			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
new file mode 100644
index 0000000..b1e3fbb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+
+/**
+ * Various tests around the proper passing of state descriptors to the operators
+ * and their serialization.
+ * 
+ * The tests use an arbitrary generic type to validate the behavior.
+ */
+@SuppressWarnings("serial")
+public class StateDescriptorPassingTest {
+
+	@SuppressWarnings("unchecked")
+	private final Class<? extends Serializer<?>> javaSerializer =
+			(Class<? extends Serializer<?>>) (Class<?>) JavaSerializer.class;
+	
+	@Test
+	public void testFoldWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+		DataStream<String> src = env.fromElements("abc");
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<String, String>() {
+					@Override
+					public String getKey(String value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.fold(new File("/"), new FoldFunction<String, File>() {
+
+					@Override
+					public File fold(File a, String e) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testReduceWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.reduce(new ReduceFunction<File>() {
+					
+					@Override
+					public File reduce(File value1, File value2) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testApplyWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+		
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.apply(new WindowFunction<File, String, String, TimeWindow>() {
+					@Override
+					public void apply(String s, TimeWindow window, 
+										Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testFoldWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+		DataStream<String> src = env.fromElements("abc");
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.fold(new File("/"), new FoldFunction<String, File>() {
+
+					@Override
+					public File fold(File a, String e) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testReduceWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.reduce(new ReduceFunction<File>() {
+
+					@Override
+					public File reduce(File value1, File value2) {
+						return null;
+					}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	@Test
+	public void testApplyWindowAllState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.timeWindowAll(Time.milliseconds(1000))
+				.apply(new AllWindowFunction<File, String, TimeWindow>() {
+					@Override
+					public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateStateDescriptorConfigured(result);
+	}
+
+	// ------------------------------------------------------------------------
+	//  generic validation
+	// ------------------------------------------------------------------------
+
+	private void validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+		OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation();
+		WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator();
+		StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+		// this would be the first statement to fail if state descriptors were not properly initialized
+		TypeSerializer<?> serializer = descr.getSerializer();
+		assertTrue(serializer instanceof KryoSerializer);
+
+		Kryo kryo = ((KryoSerializer<?>) serializer).getKryo();
+
+		assertTrue("serializer registration was not properly passed on", 
+				kryo.getSerializer(File.class) instanceof JavaSerializer);
+	}
+}