You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/10 12:51:02 UTC
flink git commit: [FLINK-4788] [streaming api] Fix state backend
classloading from configuration
Repository: flink
Updated Branches:
refs/heads/release-1.1 7267562bb -> d619f51ac
[FLINK-4788] [streaming api] Fix state backend classloading from configuration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d619f51a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d619f51a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d619f51a
Branch: refs/heads/release-1.1
Commit: d619f51ac8f922c0cf1d1e789c5141076128f04e
Parents: 7267562
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 10 14:33:57 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 10 14:40:52 2016 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 3 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 47 +++++++++++++++++---
2 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 940f699..d56c9bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -684,8 +684,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
backendName = "jobmanager";
}
- backendName = backendName.toLowerCase();
- switch (backendName) {
+ switch (backendName.toLowerCase()) {
case "jobmanager":
LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
stateBackend = MemoryStateBackend.create();
http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e9d583c..83eb4bb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,8 +22,8 @@ import akka.actor.ActorRef;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -41,8 +41,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -85,7 +89,7 @@ public class StreamTaskTest {
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new SlowlyDeserializingOperator());
- Task task = createTask(SourceStreamTask.class, cfg);
+ Task task = createTask(SourceStreamTask.class, cfg, new Configuration());
task.startTaskThread();
// wait until the task thread reached state RUNNING
@@ -120,14 +124,37 @@ public class StreamTaskTest {
}
}
+ @Test
+ public void testStateBackendLoading() throws Exception {
+ Configuration taskManagerConfig = new Configuration();
+ taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+
+ StreamConfig cfg = new StreamConfig(new Configuration());
+ cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
+ cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+
+ task.startTaskThread();
+
+ // wait for clean termination
+ task.getExecutingThread().join();
+ assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+ }
+
+
// ------------------------------------------------------------------------
// Test Utilities
// ------------------------------------------------------------------------
- private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception {
+ private Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ StreamConfig taskConfig,
+ Configuration taskManagerConfig) throws Exception {
+
LibraryCacheManager libCache = mock(LibraryCacheManager.class);
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
+
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -159,8 +186,8 @@ public class StreamTaskTest {
new FiniteDuration(60, TimeUnit.SECONDS),
libCache,
mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
- mock(TaskMetricGroup.class));
+ new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+ new UnregisteredTaskMetricsGroup());
}
// ------------------------------------------------------------------------
@@ -255,4 +282,12 @@ public class StreamTaskTest {
return null;
}
}
+
+ public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> {
+
+ @Override
+ public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
+ return mock(AbstractStateBackend.class);
+ }
+ }
}