You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/15 10:22:41 UTC
flink git commit: [FLINK-5335] Allow ListCheckpointed user functions
to return null
Repository: flink
Updated Branches:
refs/heads/master 09e081730 -> 5dab9345c
[FLINK-5335] Allow ListCheckpointed user functions to return null
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5dab9345
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5dab9345
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5dab9345
Branch: refs/heads/master
Commit: 5dab9345c70bbdc55fbc357c4eb8d28da64aa1aa
Parents: 09e0817
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 14 12:13:15 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Dec 15 11:22:03 2016 +0100
----------------------------------------------------------------------
.../api/checkpoint/ListCheckpointed.java | 1 +
.../operators/AbstractUdfStreamOperator.java | 8 +-
.../api/checkpoint/ListCheckpointedTest.java | 103 +++++++++++++++++++
.../StateInitializationContextImplTest.java | 21 ++++
4 files changed, 130 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 1031b88..5e85dc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -41,6 +41,7 @@ public interface ListCheckpointed<T extends Serializable> {
* @param checkpointId The ID of the checkpoint.
* @param timestamp Timestamp of the checkpoint.
* @return The operator state in a list of redistributable, atomic sub-states.
+ * Should not return null, but empty list instead.
* @throws Exception Thrown if the creation of the state object failed. This causes the
* checkpoint to fail. The system may decide to fail the operation (and trigger
* recovery), or to discard this checkpoint attempt and to continue running
http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1404958..81f709b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -107,15 +107,17 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
} else if (userFunction instanceof ListCheckpointed) {
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
- snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
+ snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
ListState<Serializable> listState = getOperatorStateBackend().
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
- for (Serializable statePartition : partitionableState) {
- listState.add(statePartition);
+ if (null != partitionableState) {
+ for (Serializable statePartition : partitionableState) {
+ listState.add(statePartition);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
new file mode 100644
index 0000000..6751617
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ListCheckpointedTest {
+
+ @Test
+ public void testUDFReturningNull() throws Exception {
+ TestUserFunction userFunction = new TestUserFunction(null);
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+ @Test
+ public void testUDFReturningEmpty() throws Exception {
+ TestUserFunction userFunction = new TestUserFunction(Collections.<Integer>emptyList());
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+ @Test
+ public void testUDFReturningData() throws Exception {
+ TestUserFunction userFunction = new TestUserFunction(Arrays.asList(1, 2, 3));
+ AbstractStreamOperatorTestHarness<Integer> testHarness =
+ new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0);
+ testHarness.open();
+ OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+ testHarness.initializeState(snapshot);
+ Assert.assertTrue(userFunction.isRestored());
+ }
+
+ private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> {
+
+ private static final long serialVersionUID = -8981369286399531925L;
+
+ private final List<Integer> expected;
+ private boolean restored;
+
+ public TestUserFunction(List<Integer> expected) {
+ this.expected = expected;
+ this.restored = false;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return expected;
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (null != expected) {
+ Assert.assertEquals(expected, state);
+ } else {
+ Assert.assertTrue(state.isEmpty());
+ }
+ restored = true;
+ }
+
+ public boolean isRestored() {
+ return restored;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5dab9345/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index cd94076..39dc5d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -131,6 +131,27 @@ public class StateInitializationContextImplTest {
@Test
public void getOperatorStateStreams() throws Exception {
+ int i = 0;
+ int s = 0;
+ for (StatePartitionStreamProvider streamProvider : initializationContext.getRawOperatorStateInputs()) {
+ if (0 == i % 4) {
+ ++i;
+ }
+ Assert.assertNotNull(streamProvider);
+ try (InputStream is = streamProvider.getStream()) {
+ DataInputView div = new DataInputViewStreamWrapper(is);
+
+ int val = div.readInt();
+ Assert.assertEquals(i * NUM_HANDLES + s, val);
+ }
+
+ ++s;
+ if (s == i % 4) {
+ s = 0;
+ ++i;
+ }
+ }
+
}
@Test