You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/15 07:00:03 UTC

[GitHub] asfgit closed pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

asfgit closed pull request #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void payload) {
 	RestartStrategy getRestartStrategy() {
 		return restartStrategy;
 	}
+
+	@VisibleForTesting
+	ExecutionGraph getExecutionGraph() {
+		return executionGraph;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Waits until the job has reached a certain state.
+	 * Waits until the Job has reached a certain state.
 	 *
 	 * <p>This method is based on polling and might miss very fast state transitions!
 	 */
 	public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis)
 			throws TimeoutException {
-
 		checkNotNull(eg);
 		checkNotNull(status);
 		checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long
 		}
 
 		if (System.nanoTime() >= deadline) {
-			throw new TimeoutException("The job did not reach status " + status + " in time. Current status is " + eg.getState() + '.');
+			throw new TimeoutException(
+				String.format("The job did not reach status %s in time. Current status is %s.",
+					status, eg.getState()));
 		}
 	}
 
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long
 	 */
 	public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis)
 			throws TimeoutException {
-
 		checkNotNull(execution);
 		checkNotNull(state);
 		checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution execution, ExecutionState s
 		}
 
 		if (System.nanoTime() >= deadline) {
-			throw new TimeoutException();
+			throw new TimeoutException(
+				String.format("The execution did not reach state %s in time. Current state is %s.",
+					state, execution.getState()));
+		}
+	}
+
+	/**
+	 * Waits until the ExecutionVertex has reached a certain state.
+	 *
+	 * <p>This method is based on polling and might miss very fast state transitions!
+	 */
+	public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis)
+		throws TimeoutException {
+		checkNotNull(executionVertex);
+		checkNotNull(state);
+		checkArgument(maxWaitMillis >= 0);
+
+		// this is a poor implementation - we may want to improve it eventually
+		final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE : System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+		while (true) {
+			Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+			if (execution == null || (execution.getState() != state && System.nanoTime() < deadline)) {
+				try {
+					Thread.sleep(2);
+				} catch (InterruptedException ignored) { }
+			} else {
+				break;
+			}
+
+			if (System.nanoTime() >= deadline) {
+				if (execution != null) {
+					throw new TimeoutException(
+						String.format("The execution vertex did not reach state %s in time. Current state is %s.",
+							state, execution.getState()));
+				} else {
+					throw new TimeoutException(
+						"Cannot get current execution attempt of " + executionVertex + '.');
+				}
+			}
 		}
 	}
 
@@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate(
 
 	public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis)
 			throws TimeoutException {
-
 		checkNotNull(region);
 		checkNotNull(status);
 		checkArgument(maxWaitMillis >= 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 891ff82c413..578c9066e95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,8 +24,12 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -46,6 +50,9 @@
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -60,6 +67,7 @@
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -84,6 +92,7 @@
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
@@ -113,6 +122,7 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -678,6 +688,133 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep
 		}
 	}
 
+	@Test
+	public void testRequestNextInputSplit() throws Exception {
+		// build one node JobGraph
+		InputSplitSource<TestingInputSplit> inputSplitSource = new TestingInputSplitSource();
+
+		JobVertex source = new JobVertex("vertex1");
+		source.setParallelism(1);
+		source.setInputSplitSource(inputSplitSource);
+		source.setInvokableClass(AbstractInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph(source);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+		configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+		final JobManagerSharedServices jobManagerSharedServices =
+			new TestingJobManagerSharedServicesBuilder()
+				.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+				.build();
+
+		final JobMaster jobMaster = createJobMaster(
+			configuration,
+			jobGraph,
+			haServices,
+			jobManagerSharedServices);
+
+		CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+
+		try {
+			// wait for the start to complete
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+			ExecutionGraph eg = jobMaster.getExecutionGraph();
+			ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();
+
+			SerializedInputSplit serializedInputSplit1 = jobMasterGateway
+				.requestNextInputSplit(
+					source.getID(),
+					ev.getCurrentExecutionAttempt().getAttemptId())
+				.get(1L, TimeUnit.SECONDS);
+			InputSplit inputSplit1 = InstantiationUtil
+				.deserializeObject(
+					serializedInputSplit1.getInputSplitData(),
+					ClassLoader.getSystemClassLoader());
+			assertEquals(0, inputSplit1.getSplitNumber());
+
+			SerializedInputSplit serializedInputSplit2 = jobMasterGateway
+				.requestNextInputSplit(
+					source.getID(),
+					ev.getCurrentExecutionAttempt().getAttemptId())
+				.get(1L, TimeUnit.SECONDS);
+			InputSplit inputSplit2 = InstantiationUtil
+				.deserializeObject(
+					serializedInputSplit2.getInputSplitData(),
+					ClassLoader.getSystemClassLoader());
+			assertEquals(1, inputSplit2.getSplitNumber());
+
+			ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L);
+
+			eg.failGlobal(new Exception("Testing exception"));
+
+			ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.SCHEDULED, 2000L);
+
+			SerializedInputSplit serializedInputSplit3 = jobMasterGateway
+				.requestNextInputSplit(
+					source.getID(),
+					ev.getCurrentExecutionAttempt().getAttemptId())
+				.get(1L, TimeUnit.SECONDS);
+			InputSplit inputSplit3 = InstantiationUtil
+				.deserializeObject(
+					serializedInputSplit3.getInputSplitData(),
+					ClassLoader.getSystemClassLoader());
+			assertEquals(0, inputSplit3.getSplitNumber());
+
+			SerializedInputSplit serializedInputSplit4 = jobMasterGateway
+				.requestNextInputSplit(
+					source.getID(),
+					ev.getCurrentExecutionAttempt().getAttemptId())
+				.get(1L, TimeUnit.SECONDS);
+			InputSplit inputSplit4 = InstantiationUtil
+				.deserializeObject(
+					serializedInputSplit4.getInputSplitData(),
+					ClassLoader.getSystemClassLoader());
+			assertEquals(1, inputSplit4.getSplitNumber());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
+	private static final class TestingInputSplitSource implements InputSplitSource<TestingInputSplit> {
+		@Override
+		public TestingInputSplit[] createInputSplits(int minNumSplits) {
+			return new TestingInputSplit[0];
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] inputSplits) {
+			return new TestingInputSplitAssigner();
+		}
+	}
+
+	private static final class TestingInputSplitAssigner implements InputSplitAssigner {
+
+		private int splitIndex = 0;
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId){
+			return new TestingInputSplit(splitIndex++);
+		}
+	}
+
+	private static final class TestingInputSplit implements InputSplit {
+
+		private final int splitNumber;
+
+		TestingInputSplit(int number) {
+			this.splitNumber = number;
+		}
+
+		public int getSplitNumber() {
+			return splitNumber;
+		}
+	}
+
 	/**
 	 * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
 	 * call for a finished result partition.
@@ -708,9 +845,9 @@ public void testRequestPartitionState() throws Exception {
 			final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>();
 			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
 				.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-                    tddFuture.complete(taskDeploymentDescriptor);
-                    return CompletableFuture.completedFuture(Acknowledge.get());
-                })
+					  tddFuture.complete(taskDeploymentDescriptor);
+					  return CompletableFuture.completedFuture(Acknowledge.get());
+				  })
 				.createTestingTaskExecutorGateway();
 			rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services