You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 07:41:23 UTC

[flink] branch release-1.6 updated (4020519 -> 806d3a4)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4020519  [hotfix][tests] Add file sshd-run.
     new 4529e37  [FLINK-10375] Added wrapped exception as cause
     new 27db334  [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
     new 23221f9  [FLINK-10260] Clean up log messages for TaskExecutor registrations
     new 806d3a4  [FLINK-9567][yarn] Before requesting new containers always check if it is required

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/heartbeat/HeartbeatManagerImpl.java    |  2 +-
 .../chaining/ExceptionInChainedStubException.java  | 11 +++----
 .../runtime/resourcemanager/ResourceManager.java   |  2 +-
 .../resourcemanager/slotmanager/SlotManager.java   |  4 +--
 .../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
 5 files changed, 23 insertions(+), 30 deletions(-)


[flink] 03/04: [FLINK-10260] Clean up log messages for TaskExecutor registrations

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23221f97f3462031fa5374803df1a1d57634ff8d
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Sep 20 14:57:40 2018 +0200

    [FLINK-10260] Clean up log messages for TaskExecutor registrations
    
    Change log level to debug for messages about TaskExecutor re-registeration in
    ResourceManager and SlotManager in case of mupltiple attempts of the TaskExecutor
    to connect to the ResourceManager
    
    This closes #6720.
---
 .../java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java | 2 +-
 .../org/apache/flink/runtime/resourcemanager/ResourceManager.java     | 2 +-
 .../apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa..15a3757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 	public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
 		if (!stopped) {
 			if (heartbeatTargets.containsKey(resourceID)) {
-				log.info("The target with resource ID {} is already been monitored.", resourceID);
+				log.debug("The target with resource ID {} is already been monitored.", resourceID);
 			} else {
 				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
 					resourceID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 87c4964..ca3f02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -700,7 +700,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
 		if (oldRegistration != null) {
 			// TODO :: suggest old taskExecutor to stop itself
-			log.info("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
+			log.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
 
 			// remove old task manager registration from slot manager
 			slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f207a36..89b8bcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -322,7 +322,7 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
-		LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
+		LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
 
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -364,7 +364,7 @@ public class SlotManager implements AutoCloseable {
 	public boolean unregisterTaskManager(InstanceID instanceId) {
 		checkInit();
 
-		LOG.info("Unregister TaskManager {} from the SlotManager.", instanceId);
+		LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);
 
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 


[flink] 01/04: [FLINK-10375] Added wrapped exception as cause

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4529e377eb23155f53fe1c1dd86830098313bc4f
Author: Mike Pedersen <mp...@enversion.dk>
AuthorDate: Thu Sep 20 13:53:56 2018 +0200

    [FLINK-10375] Added wrapped exception as cause
    
    This closes #6719.
---
 .../runtime/operators/chaining/ExceptionInChainedStubException.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf0..e7cebc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@ public class ExceptionInChainedStubException extends RuntimeException {
 	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
-		super();
+		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
 		this.taskName = taskName;
 		this.exception = wrappedException;
 	}


[flink] 02/04: [hotfix] Fix checkstyle violations in ExceptionInChainedStubException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27db3348f6025a22de13802057c029eb29c95627
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 15:23:33 2018 +0200

    [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
---
 .../operators/chaining/ExceptionInChainedStubException.java      | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index e7cebc5..bd55fff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 /**
@@ -24,13 +23,12 @@ package org.apache.flink.runtime.operators.chaining;
  * The exception's only purpose is to be  identifiable as such and to carry the cause exception.
  */
 public class ExceptionInChainedStubException extends RuntimeException {
-	
+
 	private static final long serialVersionUID = -7966910518892776903L;
 
 	private String taskName;
-	
+
 	private Exception exception;
-	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
 		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
@@ -38,11 +36,10 @@ public class ExceptionInChainedStubException extends RuntimeException {
 		this.exception = wrappedException;
 	}
 
-	
 	public String getTaskName() {
 		return taskName;
 	}
-	
+
 	public Exception getWrappedException() {
 		return exception;
 	}


[flink] 04/04: [FLINK-9567][yarn] Before requesting new containers always check if it is required

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 806d3a424c0ad66f49282d0496599597d0f9f0c0
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800

    [FLINK-9567][yarn] Before requesting new containers always check if it is required
    
    By comparing the number of pending slot requests and the number of pending container allocations
    it is possible to say whether we should allocate more containers or not.
    
    This closes #6669.
---
 .../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d051a72..6bc955f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -337,7 +337,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 					}
 					// Eagerly close the connection with task manager.
 					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -446,17 +446,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return new Tuple2<>(host, Integer.valueOf(port));
 	}
 
+	/**
+	 * Request new container if pending containers cannot satisfies pending slot requests.
+	 */
 	private void requestYarnContainer(Resource resource, Priority priority) {
-		resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+		int pendingSlotRequests = getNumberPendingSlotRequests();
+		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+		if (pendingSlotRequests > pendingSlotAllocation) {
+			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
 
-		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+			// make sure we transmit the request fast and receive fast news of granted allocations
+			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-		numPendingContainerRequests++;
+			numPendingContainerRequests++;
 
-		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
-			resource,
-			numPendingContainerRequests);
+			log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+				resource,
+				numPendingContainerRequests);
+		}
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -513,15 +520,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			return priority;
 		}
 	}
-
-	/**
-	 * Request new container if pending containers cannot satisfies pending slot requests.
-	 */
-	private void internalRequestYarnContainer(Resource resource, Priority priority) {
-		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
-		if (pendingSlotRequests > pendingSlotAllocation) {
-			requestYarnContainer(resource, priority);
-		}
-	}
 }