You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/10 12:51:02 UTC

flink git commit: [FLINK-4788] [streaming api] Fix state backend classloading from configuration

Repository: flink
Updated Branches:
  refs/heads/release-1.1 7267562bb -> d619f51ac


[FLINK-4788] [streaming api] Fix state backend classloading from configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d619f51a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d619f51a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d619f51a

Branch: refs/heads/release-1.1
Commit: d619f51ac8f922c0cf1d1e789c5141076128f04e
Parents: 7267562
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 10 14:33:57 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 10 14:40:52 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 47 +++++++++++++++++---
 2 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 940f699..d56c9bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -684,8 +684,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				backendName = "jobmanager";
 			}
 
-			backendName = backendName.toLowerCase();
-			switch (backendName) {
+			switch (backendName.toLowerCase()) {
 				case "jobmanager":
 					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
 					stateBackend = MemoryStateBackend.create();

http://git-wip-us.apache.org/repos/asf/flink/blob/d619f51a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index e9d583c..83eb4bb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -22,8 +22,8 @@ import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -41,8 +41,12 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -85,7 +89,7 @@ public class StreamTaskTest {
 			StreamConfig cfg = new StreamConfig(new Configuration());
 			cfg.setStreamOperator(new SlowlyDeserializingOperator());
 			
-			Task task = createTask(SourceStreamTask.class, cfg);
+			Task task = createTask(SourceStreamTask.class, cfg, new Configuration());
 			task.startTaskThread();
 			
 			// wait until the task thread reached state RUNNING 
@@ -120,14 +124,37 @@ public class StreamTaskTest {
 		}
 	}
 
+	@Test
+	public void testStateBackendLoading() throws Exception {
+		Configuration taskManagerConfig = new Configuration();
+		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+
+		task.startTaskThread();
+
+		// wait for clean termination
+		task.getExecutingThread().join();
+		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Test Utilities
 	// ------------------------------------------------------------------------
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig) throws Exception {
+	private Task createTask(
+			Class<? extends AbstractInvokable> invokable,
+			StreamConfig taskConfig,
+			Configuration taskManagerConfig) throws Exception {
+
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-		
+
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
@@ -159,8 +186,8 @@ public class StreamTaskTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-				mock(TaskMetricGroup.class));
+				new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+				new UnregisteredTaskMetricsGroup());
 	}
 	
 	// ------------------------------------------------------------------------
@@ -255,4 +282,12 @@ public class StreamTaskTest {
 			return null;
 		}
 	}
+
+	public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend> {
+
+		@Override
+		public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
+			return mock(AbstractStateBackend.class);
+		}
+	}
 }