You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/15 10:22:41 UTC

flink git commit: [FLINK-5335] Allow ListCheckpointed user functions to return null

Repository: flink
Updated Branches:
  refs/heads/master 09e081730 -> 5dab9345c


[FLINK-5335] Allow ListCheckpointed user functions to return null


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dab9345
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dab9345
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dab9345

Branch: refs/heads/master
Commit: 5dab9345c70bbdc55fbc357c4eb8d28da64aa1aa
Parents: 09e0817
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 14 12:13:15 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 15 11:22:03 2016 +0100

----------------------------------------------------------------------
 .../api/checkpoint/ListCheckpointed.java        |   1 +
 .../operators/AbstractUdfStreamOperator.java    |   8 +-
 .../api/checkpoint/ListCheckpointedTest.java    | 103 +++++++++++++++++++
 .../StateInitializationContextImplTest.java     |  21 ++++
 4 files changed, 130 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 1031b88..5e85dc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -41,6 +41,7 @@ public interface ListCheckpointed<T extends Serializable> {
 	 * @param checkpointId The ID of the checkpoint.
 	 * @param timestamp Timestamp of the checkpoint.
 	 * @return The operator state in a list of redistributable, atomic sub-states.
+	 *         Should not return null, but empty list instead.
 	 * @throws Exception Thrown if the creation of the state object failed. This causes the
 	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
 	 *                   recovery), or to discard this checkpoint attempt and to continue running

http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1404958..81f709b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -107,15 +107,17 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 		} else if (userFunction instanceof ListCheckpointed) {
 			@SuppressWarnings("unchecked")
 			List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
-							snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
+					snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
 
 			ListState<Serializable> listState = getOperatorStateBackend().
 					getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
 			listState.clear();
 
-			for (Serializable statePartition : partitionableState) {
-				listState.add(statePartition);
+			if (null != partitionableState) {
+				for (Serializable statePartition : partitionableState) {
+					listState.add(statePartition);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
new file mode 100644
index 0000000..6751617
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ListCheckpointedTest {
+
+	@Test
+	public void testUDFReturningNull() throws Exception {
+		TestUserFunction userFunction = new TestUserFunction(null);
+		AbstractStreamOperatorTestHarness<Integer> testHarness =
+				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+		testHarness.open();
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.initializeState(snapshot);
+		Assert.assertTrue(userFunction.isRestored());
+	}
+
+	@Test
+	public void testUDFReturningEmpty() throws Exception {
+		TestUserFunction userFunction = new TestUserFunction(Collections.<Integer>emptyList());
+		AbstractStreamOperatorTestHarness<Integer> testHarness =
+				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+		testHarness.open();
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.initializeState(snapshot);
+		Assert.assertTrue(userFunction.isRestored());
+	}
+
+	@Test
+	public void testUDFReturningData() throws Exception {
+		TestUserFunction userFunction = new TestUserFunction(Arrays.asList(1, 2, 3));
+		AbstractStreamOperatorTestHarness<Integer> testHarness =
+				new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+		testHarness.open();
+		OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.initializeState(snapshot);
+		Assert.assertTrue(userFunction.isRestored());
+	}
+
+	private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> {
+
+		private static final long serialVersionUID = -8981369286399531925L;
+
+		private final List<Integer> expected;
+		private boolean restored;
+
+		public TestUserFunction(List<Integer> expected) {
+			this.expected = expected;
+			this.restored = false;
+		}
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return expected;
+		}
+
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			if (null != expected) {
+				Assert.assertEquals(expected, state);
+			} else {
+				Assert.assertTrue(state.isEmpty());
+			}
+			restored = true;
+		}
+
+		public boolean isRestored() {
+			return restored;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index cd94076..39dc5d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -131,6 +131,27 @@ public class StateInitializationContextImplTest {
 	@Test
 	public void getOperatorStateStreams() throws Exception {
 
+		int i = 0;
+		int s = 0;
+		for (StatePartitionStreamProvider streamProvider : initializationContext.getRawOperatorStateInputs()) {
+			if (0 == i % 4) {
+				++i;
+			}
+			Assert.assertNotNull(streamProvider);
+			try (InputStream is = streamProvider.getStream()) {
+				DataInputView div = new DataInputViewStreamWrapper(is);
+
+				int val = div.readInt();
+				Assert.assertEquals(i * NUM_HANDLES + s, val);
+			}
+
+			++s;
+			if (s == i % 4) {
+				s = 0;
+				++i;
+			}
+		}
+
 	}
 
 	@Test