You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/01 17:06:50 UTC
[34/50] hadoop git commit: YARN-7974. Allow updating application
tracking url after registration. Contributed by Jonathan Hung
YARN-7974. Allow updating application tracking url after registration. Contributed by Jonathan Hung
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3e06a5dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3e06a5dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3e06a5dc
Branch: refs/heads/HDFS-12943
Commit: 3e06a5dcea8224ba71aec284df23b47d536bb06d
Parents: ee53602
Author: Jonathan Hung <jh...@linkedin.com>
Authored: Mon Jul 30 17:41:01 2018 -0700
Committer: Jonathan Hung <jh...@linkedin.com>
Committed: Mon Jul 30 17:44:18 2018 -0700
----------------------------------------------------------------------
.../api/protocolrecords/AllocateRequest.java | 47 +++++++++++-
.../src/main/proto/yarn_service_protos.proto | 1 +
.../hadoop/yarn/client/api/AMRMClient.java | 11 +++
.../yarn/client/api/async/AMRMClientAsync.java | 11 +++
.../api/async/impl/AMRMClientAsyncImpl.java | 5 ++
.../yarn/client/api/impl/AMRMClientImpl.java | 11 +++
.../yarn/client/api/impl/TestAMRMClient.java | 77 ++++++++++++++++++++
.../impl/pb/AllocateRequestPBImpl.java | 27 ++++++-
.../resourcemanager/DefaultAMSProcessor.java | 2 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 20 +++++
.../event/RMAppAttemptStatusupdateEvent.java | 11 +++
.../TestApplicationMasterService.java | 34 +++++++++
.../server/resourcemanager/TestRMRestart.java | 45 ++++++++++++
13 files changed, 298 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
index eee50e3..799088b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java
@@ -73,7 +73,21 @@ public abstract class AllocateRequest {
.releaseList(containersToBeReleased)
.resourceBlacklistRequest(resourceBlacklistRequest).build();
}
-
+
+ @Public
+ @Unstable
+ public static AllocateRequest newInstance(int responseID, float appProgress,
+ List<ResourceRequest> resourceAsk,
+ List<ContainerId> containersToBeReleased,
+ ResourceBlacklistRequest resourceBlacklistRequest,
+ String trackingUrl) {
+ return AllocateRequest.newBuilder().responseId(responseID)
+ .progress(appProgress).askList(resourceAsk)
+ .releaseList(containersToBeReleased)
+ .resourceBlacklistRequest(resourceBlacklistRequest)
+ .trackingUrl(trackingUrl).build();
+ }
+
@Public
@Unstable
public static AllocateRequest newInstance(int responseID, float appProgress,
@@ -240,6 +254,22 @@ public abstract class AllocateRequest {
List<SchedulingRequest> schedulingRequests) {
}
+ /**
+ * Get the tracking url update for this heartbeat.
+ * @return tracking url to update this application with
+ */
+ @Public
+ @Unstable
+ public abstract String getTrackingUrl();
+
+ /**
+ * Set the new tracking url for this application.
+ * @param trackingUrl the new tracking url
+ */
+ @Public
+ @Unstable
+ public abstract void setTrackingUrl(String trackingUrl);
+
@Public
@Unstable
public static AllocateRequestBuilder newBuilder() {
@@ -356,6 +386,19 @@ public abstract class AllocateRequest {
}
/**
+ * Set the <code>trackingUrl</code> of the request.
+ * @see AllocateRequest#setTrackingUrl(String)
+ * @param trackingUrl new tracking url
+ * @return {@link AllocateRequestBuilder}
+ */
+ @Public
+ @Unstable
+ public AllocateRequestBuilder trackingUrl(String trackingUrl) {
+ allocateRequest.setTrackingUrl(trackingUrl);
+ return this;
+ }
+
+ /**
* Return generated {@link AllocateRequest} object.
* @return {@link AllocateRequest}
*/
@@ -365,4 +408,4 @@ public abstract class AllocateRequest {
return allocateRequest;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 92a65ad..acd452d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -92,6 +92,7 @@ message AllocateRequestProto {
optional float progress = 5;
repeated UpdateContainerRequestProto update_requests = 7;
repeated SchedulingRequestProto scheduling_requests = 10;
+ optional string tracking_url = 11;
}
message NMTokenProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 32aa21d..59b3353 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -805,6 +805,17 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
}
/**
+ * Update application's tracking url on next heartbeat.
+ *
+ * @param trackingUrl new tracking url for this application
+ */
+ @Public
+ @InterfaceStability.Unstable
+ public void updateTrackingUrl(String trackingUrl) {
+ // Unimplemented.
+ }
+
+ /**
* Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(java.util.function.Supplier, int)}
* and {@link #waitFor(java.util.function.Supplier, int, int)}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 0af687b..3dd2f71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -413,6 +413,17 @@ extends AbstractService {
List<String> blacklistRemovals);
/**
+ * Update application's tracking url on next heartbeat.
+ *
+ * @param trackingUrl new tracking url for this application
+ */
+ @Public
+ @Unstable
+ public void updateTrackingUrl(String trackingUrl) {
+ // Unimplemented.
+ }
+
+ /**
* Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(java.util.function.Supplier, int)}
* and {@link #waitFor(java.util.function.Supplier, int, int)}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 4f04b66..3cf2c34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -286,6 +286,11 @@ extends AMRMClientAsync<T> {
List<String> blacklistRemovals) {
client.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
+
+ @Override
+ public void updateTrackingUrl(String trackingUrl) {
+ client.updateTrackingUrl(trackingUrl);
+ }
private class HeartbeatThread extends Thread {
public HeartbeatThread() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 7265d24..6dcecde 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -99,6 +99,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected String appHostName;
protected int appHostPort;
protected String appTrackingUrl;
+ protected String newTrackingUrl;
protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources;
@@ -308,6 +309,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
.releaseList(releaseList).updateRequests(updateList)
.schedulingRequests(schedulingRequestList).build();
+ if (this.newTrackingUrl != null) {
+ allocateRequest.setTrackingUrl(this.newTrackingUrl);
+ this.appTrackingUrl = this.newTrackingUrl;
+ this.newTrackingUrl = null;
+ }
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
@@ -1008,6 +1014,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
+ @Override
+ public synchronized void updateTrackingUrl(String trackingUrl) {
+ this.newTrackingUrl = trackingUrl;
+ }
+
private void updateAMRMToken(Token token) throws IOException {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 8dda8b4..cf83779 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -79,6 +81,7 @@ import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.eclipse.jetty.util.log.Log;
@@ -1994,4 +1997,78 @@ public class TestAMRMClient extends BaseAMRMClientTest{
}
}
}
+
+ @Test(timeout = 60000)
+ public void testNoUpdateTrackingUrl() {
+ try {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ amClient = new AMRMClientImpl<>();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ assertEquals("", amClient.appTrackingUrl);
+
+ ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
+ AllocateResponse mockResponse = mock(AllocateResponse.class);
+ when(mockRM.allocate(any(AllocateRequest.class)))
+ .thenReturn(mockResponse);
+ ApplicationMasterProtocol realRM = amClient.rmClient;
+ amClient.rmClient = mockRM;
+ // Do allocate without updated tracking url
+ amClient.allocate(0.1f);
+ ArgumentCaptor<AllocateRequest> argument =
+ ArgumentCaptor.forClass(AllocateRequest.class);
+ verify(mockRM).allocate(argument.capture());
+ assertNull(argument.getValue().getTrackingUrl());
+
+ amClient.rmClient = realRM;
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } catch (IOException | YarnException e) {
+ throw new AssertionError(
+ "testNoUpdateTrackingUrl unexpectedly threw exception: " + e);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testUpdateTrackingUrl() {
+ try {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ amClient = new AMRMClientImpl<>();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ String trackingUrl = "hadoop.apache.org";
+ assertEquals("", amClient.appTrackingUrl);
+
+ ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
+ AllocateResponse mockResponse = mock(AllocateResponse.class);
+ when(mockRM.allocate(any(AllocateRequest.class)))
+ .thenReturn(mockResponse);
+ ApplicationMasterProtocol realRM = amClient.rmClient;
+ amClient.rmClient = mockRM;
+ // Do allocate with updated tracking url
+ amClient.updateTrackingUrl(trackingUrl);
+ assertEquals(trackingUrl, amClient.newTrackingUrl);
+ assertEquals("", amClient.appTrackingUrl);
+ amClient.allocate(0.1f);
+ assertNull(amClient.newTrackingUrl);
+ assertEquals(trackingUrl, amClient.appTrackingUrl);
+ ArgumentCaptor<AllocateRequest> argument
+ = ArgumentCaptor.forClass(AllocateRequest.class);
+ verify(mockRM).allocate(argument.capture());
+ assertEquals(trackingUrl, argument.getValue().getTrackingUrl());
+
+ amClient.rmClient = realRM;
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+ } catch (IOException | YarnException e) {
+ throw new AssertionError(
+ "testUpdateTrackingUrl unexpectedly threw exception: " + e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
index 50672a3..b5360a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java
@@ -58,6 +58,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private List<UpdateContainerRequest> updateRequests = null;
private List<SchedulingRequest> schedulingRequests = null;
private ResourceBlacklistRequest blacklistRequest = null;
+ private String trackingUrl = null;
public AllocateRequestPBImpl() {
builder = AllocateRequestProto.newBuilder();
@@ -111,6 +112,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
}
+ if (this.trackingUrl != null) {
+ builder.setTrackingUrl(this.trackingUrl);
+ }
}
private void mergeLocalToProto() {
@@ -398,7 +402,28 @@ public class AllocateRequestPBImpl extends AllocateRequest {
this.release.add(convertFromProtoFormat(c));
}
}
-
+
+ @Override
+ public String getTrackingUrl() {
+ AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.trackingUrl != null) {
+ return this.trackingUrl;
+ }
+ if (p.hasTrackingUrl()) {
+ this.trackingUrl = p.getTrackingUrl();
+ }
+ return this.trackingUrl;
+ }
+
+ @Override
+ public void setTrackingUrl(String trackingUrl) {
+ maybeInitBuilder();
+ if (trackingUrl == null) {
+ builder.clearTrackingUrl();
+ }
+ this.trackingUrl = trackingUrl;
+ }
+
private void addReleasesToProto() {
maybeInitBuilder();
builder.clearRelease();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 43f73e4..4cd5925 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -401,7 +401,7 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
// Send the status update to the appAttempt.
getRmContext().getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
- .getProgress()));
+ .getProgress(), request.getTrackingUrl()));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 32f275f..3ec9c49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -1823,6 +1823,26 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Update progress
appAttempt.progress = statusUpdateEvent.getProgress();
+ // Update tracking url if changed and save it to state store
+ String newTrackingUrl = statusUpdateEvent.getTrackingUrl();
+ if (newTrackingUrl != null &&
+ !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) {
+ appAttempt.originalTrackingUrl = newTrackingUrl;
+ ApplicationAttemptStateData attemptState = ApplicationAttemptStateData
+ .newInstance(appAttempt.applicationAttemptId,
+ appAttempt.getMasterContainer(),
+ appAttempt.rmContext.getStateStore()
+ .getCredentialsFromAppAttempt(appAttempt),
+ appAttempt.startTime, appAttempt.recoveredFinalState,
+ newTrackingUrl, appAttempt.getDiagnostics(), null,
+ ContainerExitStatus.INVALID, appAttempt.getFinishTime(),
+ appAttempt.attemptMetrics.getAggregateAppResourceUsage()
+ .getResourceUsageSecondsMap(),
+ appAttempt.attemptMetrics.getPreemptedResourceSecondsMap());
+ appAttempt.rmContext.getStateStore()
+ .updateApplicationAttemptState(attemptState);
+ }
+
// Ping to AMLivelinessMonitor
appAttempt.rmContext.getAMLivelinessMonitor().receivedPing(
statusUpdateEvent.getApplicationAttemptId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java
index b1b63b1..1b7442d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptStatusupdateEvent.java
@@ -25,15 +25,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
public class RMAppAttemptStatusupdateEvent extends RMAppAttemptEvent {
private final float progress;
+ private final String trackingUrl;
public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId,
float progress) {
+ this(appAttemptId, progress, null);
+ }
+
+ public RMAppAttemptStatusupdateEvent(ApplicationAttemptId appAttemptId,
+ float progress, String trackingUrl) {
super(appAttemptId, RMAppAttemptEventType.STATUS_UPDATE);
this.progress = progress;
+ this.trackingUrl = trackingUrl;
}
public float getProgress() {
return this.progress;
}
+ public String getTrackingUrl() {
+ return this.trackingUrl;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 9696741..562ba5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -956,4 +956,38 @@ public class TestApplicationMasterService {
fail("Cannot find RMContainer");
}
}
+
+ @Test(timeout = 300000)
+ public void testUpdateTrackingUrl() throws Exception {
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+ RMApp app1 = rm.submitApp(2048);
+
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+ Assert.assertEquals("N/A", rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getOriginalTrackingUrl());
+
+ AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
+ String newTrackingUrl = "hadoop.apache.org";
+ allocateRequest.setTrackingUrl(newTrackingUrl);
+
+ am1.allocate(allocateRequest);
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getOriginalTrackingUrl());
+
+ // Send it again
+ am1.allocate(allocateRequest);
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getOriginalTrackingUrl());
+ rm.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3e06a5dc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 07c5268..9aa5c53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -2698,6 +2698,51 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
rm2.stop();
}
+ @Test(timeout = 20000)
+ public void testRMRestartAfterUpdateTrackingUrl() throws Exception {
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm.getRMStateStore();
+
+ // Register node1
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
+
+ RMApp app1 = rm.submitApp(2048);
+
+ nm1.nodeHeartbeat(true);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
+ String newTrackingUrl = "hadoop.apache.org";
+ allocateRequest.setTrackingUrl(newTrackingUrl);
+
+ am1.allocate(allocateRequest);
+ // Check in-memory and stored tracking url
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getOriginalTrackingUrl());
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getCurrentAppAttempt()
+ .getOriginalTrackingUrl());
+ Assert.assertEquals(newTrackingUrl, memStore.getState()
+ .getApplicationState().get(app1.getApplicationId())
+ .getAttempt(attempt1.getAppAttemptId()).getFinalTrackingUrl());
+
+ // Start new RM, should recover updated tracking url
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getOriginalTrackingUrl());
+ Assert.assertEquals(newTrackingUrl, rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getCurrentAppAttempt()
+ .getOriginalTrackingUrl());
+
+ rm.stop();
+ rm2.stop();
+ }
+
private Credentials getCreds() throws IOException {
Credentials ts = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org