You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 09:26:05 UTC

[flink] branch release-1.5 updated (bcbcafe -> 69cae4f)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from bcbcafe  [hotfix][docs] Regenerate docs to fix curl example for uploading jars.
     new c94b125  [FLINK-10375] Added wrapped exception as cause
     new 946f1f1  [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
     new 8dee862  [FLINK-10260] Clean up log messages for TaskExecutor registrations
     new 69cae4f  [FLINK-9567][yarn] Before requesting new containers always check if it is required

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/heartbeat/HeartbeatManagerImpl.java    |  2 +-
 .../chaining/ExceptionInChainedStubException.java  | 11 +++----
 .../runtime/resourcemanager/ResourceManager.java   |  2 +-
 .../resourcemanager/slotmanager/SlotManager.java   |  4 +--
 .../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++------------
 5 files changed, 24 insertions(+), 31 deletions(-)


[flink] 04/04: [FLINK-9567][yarn] Before requesting new containers always check if it is required

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69cae4f6daf6580ca83c452c7c3ad33489551a36
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800

    [FLINK-9567][yarn] Before requesting new containers always check if it is required
    
    By comparing the number of pending slot requests and the number of pending container allocations
    it is possible to say whether we should allocate more containers or not.
    
    This closes #6669.
---
 .../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++------------
 1 file changed, 16 insertions(+), 20 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2b4e9fb..30d7945 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -336,9 +336,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
-						closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
+						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 					}
+					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
 				}
 			}
 		);
@@ -444,17 +444,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return new Tuple2<>(host, Integer.valueOf(port));
 	}
 
+	/**
+	 * Request new container if pending containers cannot satisfies pending slot requests.
+	 */
 	private void requestYarnContainer(Resource resource, Priority priority) {
-		resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+		int pendingSlotRequests = getNumberPendingSlotRequests();
+		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+		if (pendingSlotRequests > pendingSlotAllocation) {
+			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
 
-		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+			// make sure we transmit the request fast and receive fast news of granted allocations
+			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-		numPendingContainerRequests++;
+			numPendingContainerRequests++;
 
-		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
-			resource,
-			numPendingContainerRequests);
+			log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+				resource,
+				numPendingContainerRequests);
+		}
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -511,15 +518,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			return priority;
 		}
 	}
-
-	/**
-	 * Request new container if pending containers cannot satisfies pending slot requests.
-	 */
-	private void internalRequestYarnContainer(Resource resource, Priority priority) {
-		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
-		if (pendingSlotRequests > pendingSlotAllocation) {
-			requestYarnContainer(resource, priority);
-		}
-	}
 }


[flink] 03/04: [FLINK-10260] Clean up log messages for TaskExecutor registrations

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8dee8625502a828a303b8ec4180ab00b7e794334
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Sep 20 14:57:40 2018 +0200

    [FLINK-10260] Clean up log messages for TaskExecutor registrations
    
    Change log level to debug for messages about TaskExecutor re-registeration in
    ResourceManager and SlotManager in case of mupltiple attempts of the TaskExecutor
    to connect to the ResourceManager
    
    This closes #6720.
---
 .../java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java | 2 +-
 .../org/apache/flink/runtime/resourcemanager/ResourceManager.java     | 2 +-
 .../apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa..15a3757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 	public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
 		if (!stopped) {
 			if (heartbeatTargets.containsKey(resourceID)) {
-				log.info("The target with resource ID {} is already been monitored.", resourceID);
+				log.debug("The target with resource ID {} is already been monitored.", resourceID);
 			} else {
 				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
 					resourceID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 87c4964..ca3f02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -700,7 +700,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
 		if (oldRegistration != null) {
 			// TODO :: suggest old taskExecutor to stop itself
-			log.info("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
+			log.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
 
 			// remove old task manager registration from slot manager
 			slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f207a36..89b8bcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -322,7 +322,7 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
-		LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
+		LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
 
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -364,7 +364,7 @@ public class SlotManager implements AutoCloseable {
 	public boolean unregisterTaskManager(InstanceID instanceId) {
 		checkInit();
 
-		LOG.info("Unregister TaskManager {} from the SlotManager.", instanceId);
+		LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);
 
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 


[flink] 02/04: [hotfix] Fix checkstyle violations in ExceptionInChainedStubException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 946f1f152a57060237cc5f1af6b94695a117e79e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 15:23:33 2018 +0200

    [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
---
 .../operators/chaining/ExceptionInChainedStubException.java      | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index e7cebc5..bd55fff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 /**
@@ -24,13 +23,12 @@ package org.apache.flink.runtime.operators.chaining;
  * The exception's only purpose is to be  identifiable as such and to carry the cause exception.
  */
 public class ExceptionInChainedStubException extends RuntimeException {
-	
+
 	private static final long serialVersionUID = -7966910518892776903L;
 
 	private String taskName;
-	
+
 	private Exception exception;
-	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
 		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
@@ -38,11 +36,10 @@ public class ExceptionInChainedStubException extends RuntimeException {
 		this.exception = wrappedException;
 	}
 
-	
 	public String getTaskName() {
 		return taskName;
 	}
-	
+
 	public Exception getWrappedException() {
 		return exception;
 	}


[flink] 01/04: [FLINK-10375] Added wrapped exception as cause

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c94b125d1ae143d92cadc93d388703f58e723bae
Author: Mike Pedersen <mp...@enversion.dk>
AuthorDate: Thu Sep 20 13:53:56 2018 +0200

    [FLINK-10375] Added wrapped exception as cause
    
    This closes #6719.
---
 .../runtime/operators/chaining/ExceptionInChainedStubException.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf0..e7cebc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@ public class ExceptionInChainedStubException extends RuntimeException {
 	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
-		super();
+		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
 		this.taskName = taskName;
 		this.exception = wrappedException;
 	}