You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 07:38:49 UTC

[flink] branch master updated (61cfad7 -> 2726dad)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 61cfad7  [hotfix][tests] Add file sshd-run.
     new f1511a3  [FLINK-10375] Added wrapped exception as cause
     new da3c6c4  [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
     new 542e8cc  [FLINK-10260] Clean up log messages for TaskExecutor registrations
     new 0cf776b  [FLINK-9567][yarn] Before requesting new containers always check if it is required
     new 2726dad  [hotfix] Bump japicmp reference version to 1.6.1

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/heartbeat/HeartbeatManagerImpl.java    |  2 +-
 .../chaining/ExceptionInChainedStubException.java  | 11 +++----
 .../runtime/resourcemanager/ResourceManager.java   |  2 +-
 .../resourcemanager/slotmanager/SlotManager.java   |  4 +--
 .../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
 pom.xml                                            |  2 +-
 6 files changed, 24 insertions(+), 31 deletions(-)


[flink] 01/05: [FLINK-10375] Added wrapped exception as cause

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1511a3602690f2c44d4feb78b8961e31d345326
Author: Mike Pedersen <mp...@enversion.dk>
AuthorDate: Thu Sep 20 13:53:56 2018 +0200

    [FLINK-10375] Added wrapped exception as cause
    
    This closes #6719.
---
 .../runtime/operators/chaining/ExceptionInChainedStubException.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf0..e7cebc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@ public class ExceptionInChainedStubException extends RuntimeException {
 	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
-		super();
+		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
 		this.taskName = taskName;
 		this.exception = wrappedException;
 	}


[flink] 03/05: [FLINK-10260] Clean up log messages for TaskExecutor registrations

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 542e8cc22290984b4b2e32577430ceb82dd82fde
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Sep 20 14:57:40 2018 +0200

    [FLINK-10260] Clean up log messages for TaskExecutor registrations
    
    Change log level to debug for messages about TaskExecutor re-registeration in
    ResourceManager and SlotManager in case of mupltiple attempts of the TaskExecutor
    to connect to the ResourceManager
    
    This closes #6720.
---
 .../java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java | 2 +-
 .../org/apache/flink/runtime/resourcemanager/ResourceManager.java     | 2 +-
 .../apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa..15a3757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
 	public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
 		if (!stopped) {
 			if (heartbeatTargets.containsKey(resourceID)) {
-				log.info("The target with resource ID {} is already been monitored.", resourceID);
+				log.debug("The target with resource ID {} is already been monitored.", resourceID);
 			} else {
 				HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
 					resourceID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3984483..ac1181b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -694,7 +694,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
 		if (oldRegistration != null) {
 			// TODO :: suggest old taskExecutor to stop itself
-			log.info("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
+			log.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
 
 			// remove old task manager registration from slot manager
 			slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d54d143..bab5660 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -324,7 +324,7 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
-		LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
+		LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
 
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -366,7 +366,7 @@ public class SlotManager implements AutoCloseable {
 	public boolean unregisterTaskManager(InstanceID instanceId) {
 		checkInit();
 
-		LOG.info("Unregister TaskManager {} from the SlotManager.", instanceId);
+		LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);
 
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 


[flink] 02/05: [hotfix] Fix checkstyle violations in ExceptionInChainedStubException

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da3c6c4e1b079b974c5046e0e6d864ea167a3a8d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 15:23:33 2018 +0200

    [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
---
 .../operators/chaining/ExceptionInChainedStubException.java      | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index e7cebc5..bd55fff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 /**
@@ -24,13 +23,12 @@ package org.apache.flink.runtime.operators.chaining;
  * The exception's only purpose is to be  identifiable as such and to carry the cause exception.
  */
 public class ExceptionInChainedStubException extends RuntimeException {
-	
+
 	private static final long serialVersionUID = -7966910518892776903L;
 
 	private String taskName;
-	
+
 	private Exception exception;
-	
 
 	public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
 		super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
@@ -38,11 +36,10 @@ public class ExceptionInChainedStubException extends RuntimeException {
 		this.exception = wrappedException;
 	}
 
-	
 	public String getTaskName() {
 		return taskName;
 	}
-	
+
 	public Exception getWrappedException() {
 		return exception;
 	}


[flink] 04/05: [FLINK-9567][yarn] Before requesting new containers always check if it is required

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0cf776be4483e7f939de0a7f2b1fe3263a14c6fb
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800

    [FLINK-9567][yarn] Before requesting new containers always check if it is required
    
    By comparing the number of pending slot requests and the number of pending container allocations
    it is possible to say whether we should allocate more containers or not.
    
    This closes #6669.
---
 .../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index cf3588f..956e40f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 					}
 					// Eagerly close the connection with task manager.
 					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -443,17 +443,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return new Tuple2<>(host, Integer.valueOf(port));
 	}
 
+	/**
+	 * Request new container if pending containers cannot satisfies pending slot requests.
+	 */
 	private void requestYarnContainer(Resource resource, Priority priority) {
-		resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+		int pendingSlotRequests = getNumberPendingSlotRequests();
+		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+		if (pendingSlotRequests > pendingSlotAllocation) {
+			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
 
-		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+			// make sure we transmit the request fast and receive fast news of granted allocations
+			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-		numPendingContainerRequests++;
+			numPendingContainerRequests++;
 
-		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
-			resource,
-			numPendingContainerRequests);
+			log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+				resource,
+				numPendingContainerRequests);
+		}
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -510,15 +517,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			return priority;
 		}
 	}
-
-	/**
-	 * Request new container if pending containers cannot satisfies pending slot requests.
-	 */
-	private void internalRequestYarnContainer(Resource resource, Priority priority) {
-		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
-		if (pendingSlotRequests > pendingSlotAllocation) {
-			requestYarnContainer(resource, priority);
-		}
-	}
 }


[flink] 05/05: [hotfix] Bump japicmp reference version to 1.6.1

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2726dad229db59f14709df768203251732540503
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 17:09:10 2018 +0200

    [hotfix] Bump japicmp reference version to 1.6.1
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 326e9ec..6f26635 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1558,7 +1558,7 @@ under the License.
 							<dependency>
 								<groupId>org.apache.flink</groupId>
 								<artifactId>${project.artifactId}</artifactId>
-								<version>1.6.0</version>
+								<version>1.6.1</version>
 								<type>${project.packaging}</type>
 							</dependency>
 						</oldVersion>