You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 09:26:05 UTC
[flink] branch release-1.5 updated (bcbcafe -> 69cae4f)
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.
from bcbcafe [hotfix][docs] Regenerate docs to fix curl example for uploading jars.
new c94b125 [FLINK-10375] Added wrapped exception as cause
new 946f1f1 [hotfix] Fix checkstyle violations in ExceptionInChainedStubException
new 8dee862 [FLINK-10260] Clean up log messages for TaskExecutor registrations
new 69cae4f [FLINK-9567][yarn] Before requesting new containers always check if it is required
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../runtime/heartbeat/HeartbeatManagerImpl.java | 2 +-
.../chaining/ExceptionInChainedStubException.java | 11 +++----
.../runtime/resourcemanager/ResourceManager.java | 2 +-
.../resourcemanager/slotmanager/SlotManager.java | 4 +--
.../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++------------
5 files changed, 24 insertions(+), 31 deletions(-)
[flink] 04/04: [FLINK-9567][yarn] Before requesting new containers
always check if it is required
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69cae4f6daf6580ca83c452c7c3ad33489551a36
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800
[FLINK-9567][yarn] Before requesting new containers always check if it is required
By comparing the number of pending slot requests and the number of pending container allocations
it is possible to say whether we should allocate more containers or not.
This closes #6669.
---
.../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++------------
1 file changed, 16 insertions(+), 20 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2b4e9fb..30d7945 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -336,9 +336,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
if (yarnWorkerNode != null) {
// Container completed unexpectedly ~> start a new one
final Container container = yarnWorkerNode.getContainer();
- internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
- closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
+ requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
}
+ closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
}
}
);
@@ -444,17 +444,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
return new Tuple2<>(host, Integer.valueOf(port));
}
+ /**
+ * Request new container if pending containers cannot satisfies pending slot requests.
+ */
private void requestYarnContainer(Resource resource, Priority priority) {
- resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+ int pendingSlotRequests = getNumberPendingSlotRequests();
+ int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+ if (pendingSlotRequests > pendingSlotAllocation) {
+ resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
- // make sure we transmit the request fast and receive fast news of granted allocations
- resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+ // make sure we transmit the request fast and receive fast news of granted allocations
+ resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
- numPendingContainerRequests++;
+ numPendingContainerRequests++;
- log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
- resource,
- numPendingContainerRequests);
+ log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+ resource,
+ numPendingContainerRequests);
+ }
}
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -511,15 +518,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
return priority;
}
}
-
- /**
- * Request new container if pending containers cannot satisfies pending slot requests.
- */
- private void internalRequestYarnContainer(Resource resource, Priority priority) {
- int pendingSlotRequests = getNumberPendingSlotRequests();
- int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
- if (pendingSlotRequests > pendingSlotAllocation) {
- requestYarnContainer(resource, priority);
- }
- }
}
[flink] 03/04: [FLINK-10260] Clean up log messages for TaskExecutor
registrations
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8dee8625502a828a303b8ec4180ab00b7e794334
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Sep 20 14:57:40 2018 +0200
[FLINK-10260] Clean up log messages for TaskExecutor registrations
Change log level to debug for messages about TaskExecutor re-registeration in
ResourceManager and SlotManager in case of mupltiple attempts of the TaskExecutor
to connect to the ResourceManager
This closes #6720.
---
.../java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java | 2 +-
.../org/apache/flink/runtime/resourcemanager/ResourceManager.java | 2 +-
.../apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index 242fbaa..15a3757 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -118,7 +118,7 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
if (!stopped) {
if (heartbeatTargets.containsKey(resourceID)) {
- log.info("The target with resource ID {} is already been monitored.", resourceID);
+ log.debug("The target with resource ID {} is already been monitored.", resourceID);
} else {
HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
resourceID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 87c4964..ca3f02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -700,7 +700,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
- log.info("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
+ log.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId);
// remove old task manager registration from slot manager
slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f207a36..89b8bcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -322,7 +322,7 @@ public class SlotManager implements AutoCloseable {
public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
- LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
+ LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
@@ -364,7 +364,7 @@ public class SlotManager implements AutoCloseable {
public boolean unregisterTaskManager(InstanceID instanceId) {
checkInit();
- LOG.info("Unregister TaskManager {} from the SlotManager.", instanceId);
+ LOG.debug("Unregister TaskManager {} from the SlotManager.", instanceId);
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
[flink] 02/04: [hotfix] Fix checkstyle violations in
ExceptionInChainedStubException
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 946f1f152a57060237cc5f1af6b94695a117e79e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 20 15:23:33 2018 +0200
[hotfix] Fix checkstyle violations in ExceptionInChainedStubException
---
.../operators/chaining/ExceptionInChainedStubException.java | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index e7cebc5..bd55fff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.chaining;
/**
@@ -24,13 +23,12 @@ package org.apache.flink.runtime.operators.chaining;
* The exception's only purpose is to be identifiable as such and to carry the cause exception.
*/
public class ExceptionInChainedStubException extends RuntimeException {
-
+
private static final long serialVersionUID = -7966910518892776903L;
private String taskName;
-
+
private Exception exception;
-
public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
@@ -38,11 +36,10 @@ public class ExceptionInChainedStubException extends RuntimeException {
this.exception = wrappedException;
}
-
public String getTaskName() {
return taskName;
}
-
+
public Exception getWrappedException() {
return exception;
}
[flink] 01/04: [FLINK-10375] Added wrapped exception as cause
Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c94b125d1ae143d92cadc93d388703f58e723bae
Author: Mike Pedersen <mp...@enversion.dk>
AuthorDate: Thu Sep 20 13:53:56 2018 +0200
[FLINK-10375] Added wrapped exception as cause
This closes #6719.
---
.../runtime/operators/chaining/ExceptionInChainedStubException.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
index 11b5bf0..e7cebc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ExceptionInChainedStubException.java
@@ -33,7 +33,7 @@ public class ExceptionInChainedStubException extends RuntimeException {
public ExceptionInChainedStubException(String taskName, Exception wrappedException) {
- super();
+ super("Exception in chained task '" + taskName + "'", exceptionUnwrap(wrappedException));
this.taskName = taskName;
this.exception = wrappedException;
}