You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/08 02:49:00 UTC

[jira] [Commented] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

    [ https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679216#comment-16679216 ] 

ASF GitHub Bot commented on FLINK-9869:
---------------------------------------

TisonKun closed pull request #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance
URL: https://github.com/apache/flink/pull/6345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 0458af24c06..83d8abb7d27 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -42,6 +42,11 @@
             <td style="word-wrap: break-word;">6123</td>
             <td>The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.</td>
         </tr>
+        <tr>
+            <td><h5>jobmanager.update-partition-info.send-interval</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>The interval of send update-partition-info message.</td>
+        </tr>
         <tr>
             <td><h5>jobstore.cache-size</h5></td>
             <td style="word-wrap: break-word;">52428800</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1666f213d18..43091a256b2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -154,6 +154,11 @@
 		.defaultValue(60L * 60L)
 		.withDescription("The time in seconds after which a completed job expires and is purged from the job store.");
 
+	public static final ConfigOption<Long> UPDATE_PARTITION_INFO_SEND_INTERVAL =
+		key("jobmanager.update-partition-info.send-interval")
+		.defaultValue(10L)
+		.withDescription("The interval of send update-partition-info message.");
+
 	/**
 	 * The timeout in milliseconds for requesting a slot from Slot Pool.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..4a157f9cb60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,18 +27,13 @@
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -69,6 +64,8 @@
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.stream.Collectors;
@@ -178,6 +175,10 @@
 
 	// --------------------------------------------------------------------------------------------
 
+	private final Object updatePartitionLock = new Object();
+
+	private ScheduledFuture updatePartitionFuture;
+
 	/**
 	 * Creates a new Execution attempt.
 	 *
@@ -588,24 +589,27 @@ public void deploy() throws JobException {
 
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
-
-			submitResultFuture.whenCompleteAsync(
-				(ack, failure) -> {
-					// only respond to the failure case
-					if (failure != null) {
-						if (failure instanceof TimeoutException) {
-							String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
-
-							markFailed(new Exception(
-								"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
-									+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
-						} else {
-							markFailed(failure);
-						}
-					}
-				},
-				executor);
+			executor.execute(
+				() -> {
+					final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
+
+					submitResultFuture.whenCompleteAsync(
+						(ack, failure) -> {
+							// only respond to the failure case
+							if (failure != null) {
+								if (failure instanceof TimeoutException) {
+									String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
+
+									markFailed(new Exception(
+										"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+											+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
+								} else {
+									markFailed(failure);
+								}
+							}
+						},
+						executor);
+				});
 		}
 		catch (Throwable t) {
 			markFailed(t);
@@ -759,44 +763,11 @@ else if (numConsumers == 0) {
 			// ----------------------------------------------------------------
 			else {
 				if (consumerState == RUNNING) {
-					final LogicalSlot consumerSlot = consumer.getAssignedResource();
-
-					if (consumerSlot == null) {
-						// The consumer has been reset concurrently
-						continue;
-					}
-
-					final TaskManagerLocation partitionTaskManagerLocation = partition.getProducer()
-							.getCurrentAssignedResource().getTaskManagerLocation();
-					final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
-
-					final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();
-
-					final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-					final ResultPartitionLocation partitionLocation;
-
-					if (consumerTaskManager.equals(partitionTaskManager)) {
-						// Consuming task is deployed to the same instance as the partition => local
-						partitionLocation = ResultPartitionLocation.createLocal();
-					}
-					else {
-						// Different instances => remote
-						final ConnectionID connectionId = new ConnectionID(
-								partitionTaskManagerLocation,
-								partition.getIntermediateResult().getConnectionIndex());
-
-						partitionLocation = ResultPartitionLocation.createRemote(connectionId);
-					}
-
-					final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
-							partitionId, partitionLocation);
-
-					consumer.sendUpdatePartitionInfoRpcCall(
-						Collections.singleton(
-							new PartitionInfo(
-								partition.getIntermediateResult().getId(),
-								descriptor)));
+					// cache the partition info and trigger a timer to group them and send in batch
+					final Execution partitionExecution = partition.getProducer()
+						.getCurrentExecutionAttempt();
+					consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
+					consumerVertex.getCurrentExecutionAttempt().sendPartitionInfoAsync();
 				}
 				// ----------------------------------------------------------------
 				// Consumer is scheduled or deploying => cache input channel
@@ -1031,6 +1002,10 @@ void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
 	}
 
 	void sendPartitionInfos() {
+		synchronized (updatePartitionLock) {
+			updatePartitionFuture = null;
+		}
+
 		// check if the ExecutionVertex has already been archived and thus cleared the
 		// partial partition infos queue
 		if (partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
@@ -1050,6 +1025,17 @@ void sendPartitionInfos() {
 		}
 	}
 
+	void sendPartitionInfoAsync() {
+		synchronized (updatePartitionLock) {
+			if (updatePartitionFuture == null) {
+				updatePartitionFuture = getVertex().getExecutionGraph().getFutureExecutorService().schedule(
+					() -> {
+						sendPartitionInfos();
+					}, vertex.getExecutionGraph().getUpdatePartitionInfoSendInterval(), TimeUnit.MILLISECONDS);
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Internal Actions
 	// --------------------------------------------------------------------------------------------
@@ -1218,16 +1204,21 @@ private void sendUpdatePartitionInfoRpcCall(
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
 
-			CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
-
-			updatePartitionsResultFuture.whenCompleteAsync(
-				(ack, failure) -> {
-					// fail if there was a failure
-					if (failure != null) {
-						fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
-							" failed due to:", failure));
-					}
-				}, executor);
+			executor.execute(
+				() -> {
+					CompletableFuture<Acknowledge> updatePartitionsResultFuture =
+						taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
+
+					updatePartitionsResultFuture.whenCompleteAsync(
+						(ack, failure) -> {
+							// fail if there was a failure
+							if (failure != null) {
+								fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
+									" failed due to:", failure));
+							}
+						}, executor);
+				}
+			);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..9687a640be1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -290,6 +290,8 @@
 	 * available after archiving. */
 	private CheckpointStatsTracker checkpointStatsTracker;
 
+	private long updatePartitionInfoSendInterval;
+
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
@@ -746,6 +748,15 @@ public Executor getFutureExecutor() {
 		return futureExecutor;
 	}
 
+	/**
+	 * Returns the ExecutionContext associated with this ExecutionGraph.
+	 *
+	 * @return ExecutionContext associated with this ExecutionGraph
+	 */
+	public ScheduledExecutorService getFutureExecutorService() {
+		return futureExecutor;
+	}
+
 	/**
 	 * Merges all accumulator results from the tasks previously executed in the Executions.
 	 * @return The accumulator map
@@ -804,6 +815,15 @@ public Executor getFutureExecutor() {
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
 	}
 
+	public long getUpdatePartitionInfoSendInterval() {
+		return updatePartitionInfoSendInterval;
+	}
+
+	public void setUpdatePartitionInfoSendInterval(long updatePartitionInfoSendInterval) {
+		this.updatePartitionInfoSendInterval = updatePartitionInfoSendInterval;
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..0cd32ed6294 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
@@ -367,6 +368,9 @@ public static ExecutionGraph buildGraph(
 
 		executionGraph.getFailoverStrategy().registerMetrics(metrics);
 
+		executionGraph.setUpdatePartitionInfoSendInterval(
+			jobManagerConfig.getLong(JobManagerOptions.UPDATE_PARTITION_INFO_SEND_INTERVAL));
+
 		return executionGraph;
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277941f..649c39d5711 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -159,7 +159,7 @@ public void testBuildDeploymentDescriptor() {
 
 			ExecutionGraph eg = new ExecutionGraph(
 				expectedJobInformation,
-				TestingUtils.defaultExecutor(),
+				new DirectScheduledExecutorService(),
 				TestingUtils.defaultExecutor(),
 				AkkaUtils.getDefaultTimeout(),
 				new NoRestartStrategy(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d91380ed275..3e36d95409e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -317,6 +317,9 @@ public void testFailCallOvertakesDeploymentAnswer() {
 			vertex.deployToSlot(slot);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 
+			// execute the deploy rpc call
+			queue.triggerNextAction();
+
 			Exception testError = new Exception("test error");
 			vertex.fail(testError);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Send PartitionInfo in batch to Improve perfornance
> --------------------------------------------------
>
>                 Key: FLINK-9869
>                 URL: https://issues.apache.org/jira/browse/FLINK-9869
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 1.5.1
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6
>
>
> ... current we send partition info as soon as one arrive. we could `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)