You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:32 UTC
[14/27] flink git commit: [FLINK-4381] Refactor State to Prepare For
Key-Group State Backends
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index f9698a8..1d07bdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -38,7 +39,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -54,10 +59,12 @@ import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
+import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -82,12 +89,9 @@ public class InterruptSensitiveRestoreTest {
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
cfg.setStreamOperator(new StreamSource<>(new TestSource()));
- StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle();
- StreamTaskState opState = new StreamTaskState();
- opState.setFunctionState(lockingHandle);
- StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[] { opState });
+ StreamStateHandle lockingHandle = new InterruptLockingStateHandle();
- TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState);
+ TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, lockingHandle);
Task task = createTask(tdd);
// start the task and wait until it is in "restore"
@@ -113,7 +117,10 @@ public class InterruptSensitiveRestoreTest {
private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
Configuration taskConfig,
- StateHandle<?> state) throws IOException {
+ StreamStateHandle state) throws IOException {
+
+ ChainedStateHandle<StreamStateHandle> operatorState = new ChainedStateHandle<>(Collections.singletonList(state));
+ List<KeyGroupsStateHandle> keyGroupState = Collections.emptyList();
return new TaskDeploymentDescriptor(
new JobID(),
@@ -131,7 +138,8 @@ public class InterruptSensitiveRestoreTest {
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
0,
- new SerializedValue<StateHandle<?>>(state));
+ operatorState,
+ keyGroupState);
}
private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException {
@@ -159,14 +167,34 @@ public class InterruptSensitiveRestoreTest {
// ------------------------------------------------------------------------
@SuppressWarnings("serial")
- private static class InterruptLockingStateHandle implements StateHandle<Serializable> {
+ private static class InterruptLockingStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
- private transient volatile boolean closed;
-
@Override
- public Serializable getState(ClassLoader userCodeClassLoader) {
+ public FSDataInputStream openInputStream() throws Exception {
+ ensureNotClosed();
+ FSDataInputStream is = new FSDataInputStream() {
+
+ @Override
+ public void seek(long desired) throws IOException {
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ block();
+ throw new EOFException();
+ }
+ };
+ registerCloseable(is);
+ return is;
+ }
+
+ private void block() {
IN_RESTORE_LATCH.trigger();
-
// this mimics what happens in the HDFS client code.
// an interrupt on a waiting object leads to an infinite loop
try {
@@ -175,7 +203,7 @@ public class InterruptSensitiveRestoreTest {
}
}
catch (InterruptedException e) {
- while (!closed) {
+ while (!isClosed()) {
try {
synchronized (this) {
wait();
@@ -183,8 +211,6 @@ public class InterruptSensitiveRestoreTest {
} catch (InterruptedException ignored) {}
}
}
-
- return new SerializableObject();
}
@Override
@@ -194,11 +220,6 @@ public class InterruptSensitiveRestoreTest {
public long getStateSize() throws Exception {
return 0;
}
-
- @Override
- public void close() throws IOException {
- closed = true;
- }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 05b8e8c..5e82569 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -49,7 +49,9 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.mockito.invocation.InvocationOnMock;
@@ -310,7 +312,10 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+ public void acknowledgeCheckpoint(long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles) {
+
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index cfaeaad..66bc237 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -1,236 +1,234 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for asynchronous checkpoints.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("serial")
-public class StreamTaskAsyncCheckpointTest {
-
- /**
- * This ensures that asynchronous state handles are actually materialized asynchonously.
- *
- * <p>We use latches to block at various stages and see if the code still continues through
- * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
- * test will simply lock forever.
- * @throws Exception
- */
- @Test
- public void testAsyncCheckpoints() throws Exception {
- final OneShotLatch delayCheckpointLatch = new OneShotLatch();
- final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
- final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
- StreamMockEnvironment mockEnv = new StreamMockEnvironment(
- testHarness.jobConfig,
- testHarness.taskConfig,
- testHarness.memorySize,
- new MockInputSplitProvider(),
- testHarness.bufferSize) {
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return testHarness.executionConfig;
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId) {
- super.acknowledgeCheckpoint(checkpointId);
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
- super.acknowledgeCheckpoint(checkpointId, state);
-
- // block on the latch, to verify that triggerCheckpoint returns below,
- // even though the async checkpoint would not finish
- try {
- delayCheckpointLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- assertTrue(state instanceof StreamTaskStateList);
- StreamTaskStateList stateList = (StreamTaskStateList) state;
-
- // should be only one state
- StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
- StateHandle<?> operatorState = taskState.getOperatorState();
- assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
- TestStateHandle testState = (TestStateHandle) operatorState;
- assertEquals(42, testState.checkpointId);
- assertEquals(17, testState.timestamp);
-
- // we now know that the checkpoint went through
- ensureCheckpointLatch.trigger();
- }
- };
-
- testHarness.invoke(mockEnv);
-
- // wait for the task to be running
- for (Field field: StreamTask.class.getDeclaredFields()) {
- if (field.getName().equals("isRunning")) {
- field.setAccessible(true);
- while (!field.getBoolean(task)) {
- Thread.sleep(10);
- }
-
- }
- }
-
- task.triggerCheckpoint(42, 17);
-
- // now we allow the checkpoint
- delayCheckpointLatch.trigger();
-
- // wait for the checkpoint to go through
- ensureCheckpointLatch.await();
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
- }
-
-
- // ------------------------------------------------------------------------
-
- public static class AsyncCheckpointOperator
- extends AbstractStreamOperator<String>
- implements OneInputStreamOperator<String, String> {
- @Override
- public void processElement(StreamRecord<String> element) throws Exception {
- // we also don't care
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // not interested
- }
-
-
- @Override
- public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
- StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
- AsynchronousStateHandle<String> asyncState =
- new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
-
- taskState.setOperatorState(asyncState);
-
- return taskState;
- }
-
- @Override
- public void restoreState(StreamTaskState taskState) throws Exception {
- super.restoreState(taskState);
- }
- }
-
- private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
-
- private final long checkpointId;
- private final long timestamp;
-
- public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
- this.checkpointId = checkpointId;
- this.timestamp = timestamp;
- }
-
- @Override
- public StateHandle<String> materialize() throws Exception {
- return new TestStateHandle(checkpointId, timestamp);
- }
-
- @Override
- public long getStateSize() {
- return 0;
- }
-
- @Override
- public void close() throws IOException {}
- }
-
- private static class TestStateHandle implements StateHandle<String> {
-
- public final long checkpointId;
- public final long timestamp;
-
- public TestStateHandle(long checkpointId, long timestamp) {
- this.checkpointId = checkpointId;
- this.timestamp = timestamp;
- }
-
- @Override
- public String getState(ClassLoader userCodeClassLoader) throws Exception {
- return null;
- }
-
- @Override
- public void discardState() throws Exception {}
-
- @Override
- public long getStateSize() {
- return 0;
- }
-
- @Override
- public void close() throws IOException {}
- }
-
- public static class DummyMapFunction<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) { return value; }
- }
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.runtime.tasks;
+//
+//import org.apache.flink.api.common.ExecutionConfig;
+//import org.apache.flink.api.common.functions.MapFunction;
+//import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+//import org.apache.flink.core.testutils.OneShotLatch;
+//import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+//import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+//import org.apache.flink.streaming.api.graph.StreamConfig;
+//import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+//import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+//import org.apache.flink.streaming.api.watermark.Watermark;
+//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+//import org.junit.Test;
+//import org.junit.runner.RunWith;
+//import org.powermock.core.classloader.annotations.PowerMockIgnore;
+//import org.powermock.core.classloader.annotations.PrepareForTest;
+//import org.powermock.modules.junit4.PowerMockRunner;
+//
+//import java.io.IOException;
+//import java.lang.reflect.Field;
+//
+//import static org.junit.Assert.assertEquals;
+//import static org.junit.Assert.assertTrue;
+//
+///**
+// * Tests for asynchronous checkpoints.
+// */
+//@RunWith(PowerMockRunner.class)
+//@PrepareForTest(ResultPartitionWriter.class)
+//@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+//@SuppressWarnings("serial")
+//public class StreamTaskAsyncCheckpointTest {
+//
+// /**
+// * This ensures that asynchronous state handles are actually materialized asynchonously.
+// *
+// * <p>We use latches to block at various stages and see if the code still continues through
+// * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
+// * test will simply lock forever.
+// * @throws Exception
+// */
+// @Test
+// public void testAsyncCheckpoints() throws Exception {
+// final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+// final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+//
+// final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+//
+// final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+//
+// StreamConfig streamConfig = testHarness.getStreamConfig();
+//
+// streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+//
+// StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+// testHarness.jobConfig,
+// testHarness.taskConfig,
+// testHarness.memorySize,
+// new MockInputSplitProvider(),
+// testHarness.bufferSize) {
+//
+// @Override
+// public ExecutionConfig getExecutionConfig() {
+// return testHarness.executionConfig;
+// }
+//
+// @Override
+// public void acknowledgeCheckpoint(long checkpointId) {
+// super.acknowledgeCheckpoint(checkpointId);
+// }
+//
+// @Override
+// public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+// super.acknowledgeCheckpoint(checkpointId, state);
+//
+// // block on the latch, to verify that triggerCheckpoint returns below,
+// // even though the async checkpoint would not finish
+// try {
+// delayCheckpointLatch.await();
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+//
+// assertTrue(state instanceof StreamTaskStateList);
+// StreamTaskStateList stateList = (StreamTaskStateList) state;
+//
+// // should be only one state
+// StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
+// StateHandle<?> operatorState = taskState.getOperatorState();
+// assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
+// TestStateHandle testState = (TestStateHandle) operatorState;
+// assertEquals(42, testState.checkpointId);
+// assertEquals(17, testState.timestamp);
+//
+// // we now know that the checkpoint went through
+// ensureCheckpointLatch.trigger();
+// }
+// };
+//
+// testHarness.invoke(mockEnv);
+//
+// // wait for the task to be running
+// for (Field field: StreamTask.class.getDeclaredFields()) {
+// if (field.getName().equals("isRunning")) {
+// field.setAccessible(true);
+// while (!field.getBoolean(task)) {
+// Thread.sleep(10);
+// }
+//
+// }
+// }
+//
+// task.triggerCheckpoint(42, 17);
+//
+// // now we allow the checkpoint
+// delayCheckpointLatch.trigger();
+//
+// // wait for the checkpoint to go through
+// ensureCheckpointLatch.await();
+//
+// testHarness.endInput();
+// testHarness.waitForTaskCompletion();
+// }
+//
+//
+// // ------------------------------------------------------------------------
+//
+// public static class AsyncCheckpointOperator
+// extends AbstractStreamOperator<String>
+// implements OneInputStreamOperator<String, String> {
+// @Override
+// public void processElement(StreamRecord<String> element) throws Exception {
+// // we also don't care
+// }
+//
+// @Override
+// public void processWatermark(Watermark mark) throws Exception {
+// // not interested
+// }
+//
+//
+// @Override
+// public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
+// StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+//
+// AsynchronousStateHandle<String> asyncState =
+// new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
+//
+// taskState.setOperatorState(asyncState);
+//
+// return taskState;
+// }
+//
+// @Override
+// public void restoreState(StreamTaskState taskState) throws Exception {
+// super.restoreState(taskState);
+// }
+// }
+//
+// private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
+//
+// private final long checkpointId;
+// private final long timestamp;
+//
+// public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
+// this.checkpointId = checkpointId;
+// this.timestamp = timestamp;
+// }
+//
+// @Override
+// public StateHandle<String> materialize() throws Exception {
+// return new TestStateHandle(checkpointId, timestamp);
+// }
+//
+// @Override
+// public long getStateSize() {
+// return 0;
+// }
+//
+// @Override
+// public void close() throws IOException {}
+// }
+//
+// private static class TestStateHandle implements StateHandle<String> {
+//
+// public final long checkpointId;
+// public final long timestamp;
+//
+// public TestStateHandle(long checkpointId, long timestamp) {
+// this.checkpointId = checkpointId;
+// this.timestamp = timestamp;
+// }
+//
+// @Override
+// public String getState(ClassLoader userCodeClassLoader) throws Exception {
+// return null;
+// }
+//
+// @Override
+// public void discardState() throws Exception {}
+//
+// @Override
+// public long getStateSize() {
+// return 0;
+// }
+//
+// @Override
+// public void close() throws IOException {}
+// }
+//
+// public static class DummyMapFunction<T> implements MapFunction<T, T> {
+// @Override
+// public T map(T value) { return value; }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 675e7b6..bc255ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -22,24 +22,18 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -75,47 +69,41 @@ public class MockContext<IN, OUT> {
return output;
}
- public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
+ public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) throws Exception {
return createAndExecuteForKeyedStream(operator, inputs, null, null);
}
public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(
OneInputStreamOperator<IN, OUT> operator, List<IN> inputs,
- KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) {
+ KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) throws Exception {
+
+ OneInputStreamOperatorTestHarness<IN, OUT> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.configureForKeyedStream(keySelector, keyType);
+
+ testHarness.setup();
+ testHarness.open();
- MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
+ operator.open();
- StreamConfig config = new StreamConfig(new Configuration());
- if (keySelector != null && keyType != null) {
- config.setStateKeySerializer(keyType.createSerializer(new ExecutionConfig()));
- config.setStatePartitioner(0, keySelector);
+ for (IN in: inputs) {
+ testHarness.processElement(new StreamRecord<>(in));
}
-
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- operator.setup(mockTask, config, mockContext.output);
- try {
- operator.open();
-
- StreamRecord<IN> record = new StreamRecord<IN>(null);
- for (IN in: inputs) {
- record = record.replace(in);
- synchronized (lock) {
- operator.setKeyContextElement1(record);
- operator.processElement(record);
- }
- }
- operator.close();
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke operator.", e);
- } finally {
- timerService.shutdownNow();
+ testHarness.close();
+
+ ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+
+ List<OUT> result = new ArrayList<>();
+
+ for (Object o : output) {
+ if (o instanceof StreamRecord) {
+ result.add((OUT) ((StreamRecord) o).getValue());
+ }
}
- return mockContext.getOutputs();
+ return result;
}
private static StreamTask<?, ?> createMockTaskWithTimer(
@@ -149,22 +137,6 @@ public class MockContext<IN, OUT> {
}
}).when(task).registerTimer(anyLong(), any(Triggerable.class));
-
- try {
- doAnswer(new Answer<AbstractStateBackend>() {
- @Override
- public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
- final String operatorIdentifier = (String) invocationOnMock.getArguments()[0];
- final TypeSerializer<?> keySerializer = (TypeSerializer<?>) invocationOnMock.getArguments()[1];
- MemoryStateBackend backend = MemoryStateBackend.create();
- backend.initializeForJob(new DummyEnvironment("dummty", 1, 0), operatorIdentifier, keySerializer);
- return backend;
- }
- }).when(task).createStateBackend(any(String.class), any(TypeSerializer.class));
- } catch (Exception e) {
- e.printStackTrace();
- }
-
return task;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6855989..6cb46d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -27,28 +27,23 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
-import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
@@ -73,9 +68,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final ConcurrentLinkedQueue<Object> outputList;
final StreamConfig config;
-
+
final ExecutionConfig executionConfig;
-
+
final Object checkpointLock;
final TimeServiceProvider timeServiceProvider;
@@ -89,8 +84,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
* Whether setup() was called on the operator. This is reset when calling close().
*/
private boolean setupCalled = false;
-
-
+
+
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
this(operator, new ExecutionConfig());
}
@@ -107,17 +102,20 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
TimeServiceProvider testTimeProvider) {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<Object>();
- this.config = new StreamConfig(new Configuration());
+ Configuration underlyingConfig = new Configuration();
+ this.config = new StreamConfig(underlyingConfig);
+ this.config.setCheckpointingEnabled(true);
this.executionConfig = executionConfig;
this.checkpointLock = new Object();
- final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+ final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig);
mockTask = mock(StreamTask.class);
timeServiceProvider = testTimeProvider;
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
+ when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
@@ -173,8 +171,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
ClosureCleaner.clean(keySelector, false);
config.setStatePartitioner(0, keySelector);
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+ config.setKeyGroupAssigner(new HashKeyGroupAssigner<K>(10));
}
-
+
/**
* Get all the output from the task. This contains StreamRecords and Events interleaved. Use
* {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
@@ -206,47 +205,30 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotOperatorState(long, long)}
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#snapshotState(org.apache.flink.core.fs.FSDataOutputStream, long, long)} ()}
*/
- public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
- StreamTaskState snapshot = operator.snapshotOperatorState(checkpointId, timestamp);
- // materialize asynchronous state handles
- if (snapshot != null) {
- if (snapshot.getFunctionState() instanceof AsynchronousStateHandle) {
- AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) snapshot.getFunctionState();
- snapshot.setFunctionState(asyncState.materialize());
- }
- if (snapshot.getOperatorState() instanceof AsynchronousStateHandle) {
- AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) snapshot.getOperatorState();
- snapshot.setOperatorState(asyncState.materialize());
- }
- if (snapshot.getKvStates() != null) {
- Set<String> keys = snapshot.getKvStates().keySet();
- HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = snapshot.getKvStates();
- for (String key: keys) {
- if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
- AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
- kvStates.put(key, asyncHandle.materialize());
- }
- }
- }
-
- }
- return snapshot;
+ public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
+ // simply use an in-memory handle
+ MemoryStateBackend backend = new MemoryStateBackend();
+ AbstractStateBackend.CheckpointStateOutputStream outStream =
+ backend.createCheckpointStateOutputStream(checkpointId, timestamp);
+ operator.snapshotState(outStream, checkpointId, timestamp);
+ return outStream.closeAndGetHandle();
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
*/
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
operator.notifyOfCompletedCheckpoint(checkpointId);
}
+
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)}
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
*/
- public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
- operator.restoreState(snapshot);
+ public void restore(StreamStateHandle snapshot) throws Exception {
+ operator.restoreState(snapshot.openInputStream());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index faf38a3..779436a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -30,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -167,20 +169,20 @@ public class WindowingTestHarness<K, IN, W extends Window> {
/**
* Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
*/
- public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
+ public StreamStateHandle snapshot(long checkpointId, long timestamp) throws Exception {
return testHarness.snapshot(checkpointId, timestamp);
}
/**
- * Resumes execution from a provided {@link StreamTaskState}. This is used to test recovery after a failure.
+ * Resumes execution from a provided {@link StreamStateHandle}. This is used to test recovery after a failure.
*/
- public void restore(StreamTaskState snapshot, long recoveryTime) throws Exception {
+ public void restore(StreamStateHandle stateHandle) throws Exception {
Preconditions.checkArgument(!isOpen,
"You are trying to restore() while the operator is still open. " +
"Please call close() first.");
testHarness.setup();
- testHarness.restore(snapshot, recoveryTime);
+ testHarness.restore(stateHandle);
openOperator();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 199a6af..97c8339 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -73,6 +73,8 @@ import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+ private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
private static final int PARALLELISM = 4;
private static ForkableFlinkMiniCluster cluster;
@@ -109,7 +111,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
public void initStateBackend() throws IOException {
switch (stateBackendEnum) {
case MEM:
- this.stateBackend = new MemoryStateBackend();
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE);
break;
case FILE: {
String backups = tempFolder.newFolder().getAbsolutePath();
@@ -119,7 +121,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
case ROCKSDB: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+ RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
rdb.setDbStoragePath(rocksDb);
this.stateBackend = rdb;
break;
@@ -127,7 +129,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
case ROCKSDB_FULLY_ASYNC: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+ RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
rdb.setDbStoragePath(rocksDb);
rdb.enableFullyAsyncSnapshots();
this.stateBackend = rdb;
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 0de2a75..8d1baeb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -145,7 +146,7 @@ public class RescalingITCase extends TestLogger {
for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = keyGroupAssigner.getKeyGroupIndex(key);
- expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+ expectedResult.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupIndex), numberElements * key));
}
assertEquals(expectedResult, actualResult);
@@ -188,7 +189,7 @@ public class RescalingITCase extends TestLogger {
for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = keyGroupAssigner2.getKeyGroupIndex(key);
- expectedResult2.add(Tuple2.of(keyGroupIndex % parallelism2, key * (numberElements + numberElements2)));
+ expectedResult2.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism2, keyGroupIndex), key * (numberElements + numberElements2)));
}
assertEquals(expectedResult2, actualResult2);
@@ -351,7 +352,8 @@ public class RescalingITCase extends TestLogger {
for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = keyGroupAssigner.getKeyGroupIndex(key);
- expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+// expectedResult.add(Tuple2.of(keyGroupIndex % parallelism, numberElements * key));
+ expectedResult.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupIndex) , numberElements * key));
}
assertEquals(expectedResult, actualResult);
@@ -401,7 +403,7 @@ public class RescalingITCase extends TestLogger {
for (int key = 0; key < numberKeys; key++) {
int keyGroupIndex = keyGroupAssigner2.getKeyGroupIndex(key);
- expectedResult2.add(Tuple2.of(keyGroupIndex % parallelism2, key * (numberElements + numberElements2)));
+ expectedResult2.add(Tuple2.of(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism2, keyGroupIndex), key * (numberElements + numberElements2)));
}
assertEquals(expectedResult2, actualResult2);
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 4fc310c..550ba75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -30,11 +30,10 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -46,8 +45,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
@@ -61,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
@@ -71,7 +70,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
@@ -219,7 +217,7 @@ public class SavepointITCase extends TestLogger {
new RequestSavepoint(savepointPath),
deadline.timeLeft());
- SavepointV0 savepoint = (SavepointV0) ((ResponseSavepoint) Await.result(
+ SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
savepointFuture, deadline.timeLeft())).savepoint();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
@@ -334,7 +332,7 @@ public class SavepointITCase extends TestLogger {
assertNotNull(subtaskState);
errMsg = "Initial operator state mismatch.";
- assertEquals(errMsg, subtaskState.getState(), tdd.getOperatorState());
+ assertEquals(errMsg, subtaskState.getChainedStateHandle(), tdd.getOperatorState());
}
}
@@ -345,7 +343,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Disposing savepoint " + savepointPath + ".");
Future<Object> disposeFuture = jobManager.ask(
- new DisposeSavepoint(savepointPath, Option.<List<BlobKey>>empty()),
+ new DisposeSavepoint(savepointPath),
deadline.timeLeft());
errMsg = "Failed to dispose savepoint " + savepointPath + ".";
@@ -360,14 +358,13 @@ public class SavepointITCase extends TestLogger {
for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
- StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState()
- .deserializeValue(ClassLoader.getSystemClassLoader());
+ ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getChainedStateHandle();
- for (StreamTaskState taskState : taskStateList.getState(
- ClassLoader.getSystemClassLoader())) {
-
- AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState();
- checkpointFiles.add(new File(fsState.getFilePath().toUri()));
+ for (int i = 0; i < streamTaskState.getLength(); i++) {
+ if (streamTaskState.get(i) != null) {
+ FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
+ checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+ }
}
}
}
@@ -499,21 +496,20 @@ public class SavepointITCase extends TestLogger {
new RequestSavepoint(savepointPath),
deadline.timeLeft());
- SavepointV0 savepoint = (SavepointV0) ((ResponseSavepoint) Await.result(
+ SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result(
savepointFuture, deadline.timeLeft())).savepoint();
LOG.info("Retrieved savepoint: " + savepointPath + ".");
// Check that all checkpoint files have been removed
for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
- StreamTaskStateList taskStateList = (StreamTaskStateList) subtaskState.getState()
- .deserializeValue(ClassLoader.getSystemClassLoader());
+ ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getChainedStateHandle();
- for (StreamTaskState taskState : taskStateList.getState(
- ClassLoader.getSystemClassLoader())) {
-
- AbstractFileStateHandle fsState = (AbstractFileStateHandle) taskState.getFunctionState();
- checkpointFiles.add(new File(fsState.getFilePath().toUri()));
+ for (int i = 0; i < streamTaskState.getLength(); i++) {
+ if (streamTaskState.get(i) != null) {
+ FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
+ checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index cdc7a80..3bc3cf5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -303,7 +303,7 @@ public class ClassLoaderITCase extends TestLogger {
// Dispose savepoint
LOG.info("Disposing savepoint at " + savepointPath);
- Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath, Option.apply(blobKeys)), deadline.timeLeft());
+ Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
index c7d9a42..b3e2137 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.test.state;
import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateObject;
import org.junit.Test;
import org.reflections.Reflections;
@@ -34,7 +34,7 @@ import static org.junit.Assert.*;
public class StateHandleSerializationTest {
/**
- * This test validates that all subclasses of {@link StateHandle} have a proper
+ * This test validates that all subclasses of {@link StateObject} have a proper
* serial version UID.
*/
@Test
@@ -46,7 +46,7 @@ public class StateHandleSerializationTest {
@SuppressWarnings("unchecked")
Set<Class<?>> stateHandleImplementations = (Set<Class<?>>) (Set<?>)
- reflections.getSubTypesOf(StateHandle.class);
+ reflections.getSubTypesOf(StateObject.class);
for (Class<?> clazz : stateHandleImplementations) {
validataSerialVersionUID(clazz);
@@ -73,7 +73,7 @@ public class StateHandleSerializationTest {
private static void validataSerialVersionUID(Class<?> clazz) {
// all non-interface types must have a serial version UID
if (!clazz.isInterface()) {
- assertFalse("Anonymous state handle classes have problematic serialization behavior",
+ assertFalse("Anonymous state handle classes have problematic serialization behavior: " + clazz,
clazz.isAnonymousClass());
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 6288946..41455cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -34,13 +34,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Test;
-import java.io.Serializable;
-
import static org.junit.Assert.fail;
public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -132,13 +129,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
long timestamp) throws Exception {
return null;
}
-
- @Override
- public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state,
- long checkpointID,
- long timestamp) throws Exception {
- return null;
- }
}
static final class SuccessException extends Exception {