You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/08/12 23:26:19 UTC
svn commit: r1513258 [4/10] - in
/hadoop/common/branches/YARN-321/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-...
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationReportPBImpl.java Mon Aug 12 21:25:49 2013
@@ -46,6 +46,7 @@ public class ApplicationReportPBImpl ext
private ApplicationId applicationId;
private ApplicationAttemptId currentApplicationAttemptId;
private Token clientToAMToken = null;
+ private Token amRmToken = null;
public ApplicationReportPBImpl() {
builder = ApplicationReportProto.newBuilder();
@@ -228,7 +229,20 @@ public class ApplicationReportPBImpl ext
}
return p.getApplicationType();
}
-
+
+ @Override
+ public Token getAMRMToken() {
+ ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (amRmToken != null) {
+ return amRmToken;
+ }
+ if (!p.hasAmRmToken()) {
+ return null;
+ }
+ amRmToken = convertFromProtoFormat(p.getAmRmToken());
+ return amRmToken;
+ }
+
@Override
public void setApplicationId(ApplicationId applicationId) {
maybeInitBuilder();
@@ -377,6 +391,15 @@ public class ApplicationReportPBImpl ext
builder.setProgress(progress);
}
+ @Override
+ public void setAMRMToken(Token amRmToken) {
+ maybeInitBuilder();
+ if (amRmToken == null) {
+ builder.clearAmRmToken();
+ }
+ this.amRmToken = amRmToken;
+ }
+
public ApplicationReportProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -420,6 +443,11 @@ public class ApplicationReportPBImpl ext
builder.getClientToAmToken())) {
builder.setClientToAmToken(convertToProtoFormat(this.clientToAMToken));
}
+ if (this.amRmToken != null
+ && !((TokenPBImpl) this.amRmToken).getProto().equals(
+ builder.getAmRmToken())) {
+ builder.setAmRmToken(convertToProtoFormat(this.amRmToken));
+ }
}
private void mergeLocalToProto() {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java Mon Aug 12 21:25:49 2013
@@ -166,9 +166,11 @@ public class RpcServerFactoryPBImpl impl
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService, String portRangeConfig) throws IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
- RPC.Server server = RPC.getServer(pbProtocol, blockingService,
- addr.getHostName(), addr.getPort(), numHandlers, false, conf,
- secretManager, portRangeConfig);
+ RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol)
+ .setInstance(blockingService).setBindAddress(addr.getHostName())
+ .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)
+ .setSecretManager(secretManager).setPortRangeConfig(portRangeConfig)
+ .build();
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server;
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java Mon Aug 12 21:25:49 2013
@@ -41,6 +41,7 @@ public class AggregatedLogDeletionServic
private static final Log LOG = LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null;
+ private long checkIntervalMsecs;
static class LogDeletionTask extends TimerTask {
private Configuration conf;
@@ -133,37 +134,71 @@ public class AggregatedLogDeletionServic
@Override
protected void serviceStart() throws Exception {
+ scheduleLogDeletionTask();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopTimer();
+ super.serviceStop();
+ }
+
+ private void setLogAggCheckIntervalMsecs(long retentionSecs) {
+ Configuration conf = getConfig();
+ checkIntervalMsecs = 1000 * conf
+ .getLong(
+ YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+ if (checkIntervalMsecs <= 0) {
+ // when unspecified compute check interval as 1/10th of retention
+ checkIntervalMsecs = (retentionSecs * 1000) / 10;
+ }
+ }
+
+ public void refreshLogRetentionSettings() {
+ if (getServiceState() == STATE.STARTED) {
+ Configuration conf = createConf();
+ setConfig(conf);
+ stopTimer();
+ scheduleLogDeletionTask();
+ } else {
+ LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started");
+ }
+ }
+
+ private void scheduleLogDeletionTask() {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- //Log aggregation is not enabled so don't bother
+ // Log aggregation is not enabled so don't bother
return;
}
- long retentionSecs = conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
+ long retentionSecs = conf.getLong(
+ YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
- if(retentionSecs < 0) {
- LOG.info("Log Aggregation deletion is disabled because retention is" +
- " too small (" + retentionSecs + ")");
+ if (retentionSecs < 0) {
+ LOG.info("Log Aggregation deletion is disabled because retention is"
+ + " too small (" + retentionSecs + ")");
return;
}
- long checkIntervalMsecs = 1000 * conf.getLong(
- YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
- if (checkIntervalMsecs <= 0) {
- // when unspecified compute check interval as 1/10th of retention
- checkIntervalMsecs = (retentionSecs * 1000) / 10;
- }
+ setLogAggCheckIntervalMsecs(retentionSecs);
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
- super.serviceStart();
}
- @Override
- protected void serviceStop() throws Exception {
- if(timer != null) {
+ private void stopTimer() {
+ if (timer != null) {
timer.cancel();
}
- super.serviceStop();
+ }
+
+ public long getCheckIntervalMsecs() {
+ return checkIntervalMsecs;
+ }
+
+ protected Configuration createConf() {
+ return new Configuration();
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocolPB.java Mon Aug 12 21:25:49 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.proto.Reso
@Private
@Unstable
@ProtocolInfo(
- protocolName = "org.apache.hadoop.yarn.api.ResourceManagerAdministrationProtocolPB",
+ protocolName = "org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB",
protocolVersion = 1)
public interface ResourceManagerAdministrationProtocolPB extends ResourceManagerAdministrationProtocolService.BlockingInterface {
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java Mon Aug 12 21:25:49 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
import junit.framework.Assert;
@@ -31,12 +33,13 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -104,12 +107,15 @@ public class TestContainerLaunchRPC {
TestRPC.newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
- StartContainerRequest scRequest = recordFactory
- .newRecordInstance(StartContainerRequest.class);
- scRequest.setContainerLaunchContext(containerLaunchContext);
- scRequest.setContainerToken(containerToken);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
try {
- proxy.startContainer(scRequest);
+ proxy.startContainers(allRequests);
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
Assert.assertEquals("Error, exception is not: "
@@ -129,17 +135,8 @@ public class TestContainerLaunchRPC {
private ContainerStatus status = null;
@Override
- public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws YarnException {
- GetContainerStatusResponse response = recordFactory
- .newRecordInstance(GetContainerStatusResponse.class);
- response.setStatus(status);
- return response;
- }
-
- @Override
- public StartContainerResponse startContainer(StartContainerRequest request)
- throws YarnException, IOException {
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException, IOException {
try {
// make the thread sleep to look like its not going to respond
Thread.sleep(10000);
@@ -151,11 +148,22 @@ public class TestContainerLaunchRPC {
}
@Override
- public StopContainerResponse stopContainer(StopContainerRequest request)
- throws YarnException {
+ public StopContainersResponse
+ stopContainers(StopContainersRequest requests) throws YarnException,
+ IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
+
+ @Override
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws YarnException, IOException {
+ List<ContainerStatus> list = new ArrayList<ContainerStatus>();
+ list.add(status);
+ GetContainerStatusesResponse response =
+ GetContainerStatusesResponse.newInstance(list, null);
+ return null;
+ }
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Mon Aug 12 21:25:49 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import junit.framework.Assert;
@@ -33,13 +35,14 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -122,9 +125,6 @@ public class TestRPC {
ApplicationAttemptId.newInstance(applicationId, 0);
ContainerId containerId =
ContainerId.newInstance(applicationAttemptId, 100);
- StartContainerRequest scRequest =
- recordFactory.newRecordInstance(StartContainerRequest.class);
- scRequest.setContainerLaunchContext(containerLaunchContext);
NodeId nodeId = NodeId.newInstance("localhost", 1234);
Resource resource = Resource.newInstance(1234, 2);
ContainerTokenIdentifier containerTokenIdentifier =
@@ -132,22 +132,32 @@ public class TestRPC {
resource, System.currentTimeMillis() + 10000, 42, 42);
Token containerToken = newContainerToken(nodeId, "password".getBytes(),
containerTokenIdentifier);
- scRequest.setContainerToken(containerToken);
- proxy.startContainer(scRequest);
-
- GetContainerStatusRequest gcsRequest =
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
- gcsRequest.setContainerId(containerId);
- GetContainerStatusResponse response = proxy.getContainerStatus(gcsRequest);
- ContainerStatus status = response.getStatus();
-
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ proxy.startContainers(allRequests);
+
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(containerId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ GetContainerStatusesResponse response =
+ proxy.getContainerStatuses(gcsRequest);
+ List<ContainerStatus> statuses = response.getContainerStatuses();
+
//test remote exception
boolean exception = false;
try {
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
- stopRequest.setContainerId(containerId);
- proxy.stopContainer(stopRequest);
- } catch (YarnException e) {
+ StopContainersRequest stopRequest =
+ recordFactory.newRecordInstance(StopContainersRequest.class);
+ stopRequest.setContainerIds(containerIds);
+ proxy.stopContainers(stopRequest);
+ } catch (YarnException e) {
exception = true;
Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
@@ -158,46 +168,51 @@ public class TestRPC {
Assert.assertTrue(exception);
server.stop();
- Assert.assertNotNull(status);
- Assert.assertEquals(ContainerState.RUNNING, status.getState());
+ Assert.assertNotNull(statuses.get(0));
+ Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
}
public class DummyContainerManager implements ContainerManagementProtocol {
- private ContainerStatus status = null;
-
+ private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+
@Override
- public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request)
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request)
throws YarnException {
- GetContainerStatusResponse response =
- recordFactory.newRecordInstance(GetContainerStatusResponse.class);
- response.setStatus(status);
+ GetContainerStatusesResponse response =
+ recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
+ response.setContainerStatuses(statuses);
return response;
}
@Override
- public StartContainerResponse startContainer(StartContainerRequest request)
- throws YarnException {
- Token containerToken = request.getContainerToken();
- ContainerTokenIdentifier tokenId = null;
-
- try {
- tokenId = newContainerTokenIdentifier(containerToken);
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException {
+ StartContainersResponse response =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
+ for (StartContainerRequest request : requests.getStartContainerRequests()) {
+ Token containerToken = request.getContainerToken();
+ ContainerTokenIdentifier tokenId = null;
+
+ try {
+ tokenId = newContainerTokenIdentifier(containerToken);
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ ContainerStatus status =
+ recordFactory.newRecordInstance(ContainerStatus.class);
+ status.setState(ContainerState.RUNNING);
+ status.setContainerId(tokenId.getContainerID());
+ status.setExitStatus(0);
+ statuses.add(status);
+
}
- StartContainerResponse response =
- recordFactory.newRecordInstance(StartContainerResponse.class);
- status = recordFactory.newRecordInstance(ContainerStatus.class);
- status.setState(ContainerState.RUNNING);
- status.setContainerId(tokenId.getContainerID());
- status.setExitStatus(0);
return response;
}
@Override
- public StopContainerResponse stopContainer(StopContainerRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest request)
throws YarnException {
Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestApplicatonReport.java Mon Aug 12 21:25:49 2013
@@ -46,6 +46,7 @@ public class TestApplicatonReport {
appReport2.setCurrentApplicationAttemptId(null);
Assert.assertNull(appReport2.getCurrentApplicationAttemptId());
Assert.assertNotSame(appReport2, appReport3);
+ Assert.assertNull(appReport1.getAMRMToken());
}
protected static ApplicationReport createApplicationReport(
@@ -57,7 +58,7 @@ public class TestApplicatonReport {
ApplicationReport.newInstance(appId, appAttemptId, "user", "queue",
"appname", "host", 124, null, YarnApplicationState.FINISHED,
"diagnostics", "url", 0, 0, FinalApplicationStatus.SUCCEEDED, null,
- "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE);
+ "N/A", 0.53789f, YarnConfiguration.DEFAULT_APPLICATION_TYPE, null);
return appReport;
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java Mon Aug 12 21:25:49 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Assert;
import static org.mockito.Mockito.*;
@@ -129,6 +130,99 @@ public class TestAggregatedLogDeletionSe
}
@Test
+ public void testRefreshLogRetentionSettings() throws IOException {
+ long now = System.currentTimeMillis();
+ //time before 2000 sec
+ long before2000Secs = now - (2000 * 1000);
+ //time before 50 sec
+ long before50Secs = now - (50 * 1000);
+ String root = "mockfs://foo/";
+ String remoteRootLogDir = root + "tmp/logs";
+ String suffix = "logs";
+ final Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+ conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ "1");
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+ Path rootPath = new Path(root);
+ FileSystem rootFs = rootPath.getFileSystem(conf);
+ FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
+
+ Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+ Path userDir = new Path(remoteRootLogPath, "me");
+ FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+ userDir);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+ new FileStatus[] { userDirStatus });
+
+ Path userLogDir = new Path(userDir, suffix);
+
+ //Set time last modified of app1Dir directory and its files to before2000Secs
+ Path app1Dir = new Path(userLogDir, "application_1_1");
+ FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
+ app1Dir);
+
+ //Set time last modified of app1Dir directory and its files to before50Secs
+ Path app2Dir = new Path(userLogDir, "application_1_2");
+ FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+ app2Dir);
+
+ when(mockFs.listStatus(userLogDir)).thenReturn(
+ new FileStatus[] { app1DirStatus, app2DirStatus });
+
+ Path app1Log1 = new Path(app1Dir, "host1");
+ FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
+ app1Log1);
+
+ when(mockFs.listStatus(app1Dir)).thenReturn(
+ new FileStatus[] { app1Log1Status });
+
+ Path app2Log1 = new Path(app2Dir, "host1");
+ FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
+ app2Log1);
+
+ when(mockFs.listStatus(app2Dir)).thenReturn(
+ new FileStatus[] { app2Log1Status });
+
+ AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
+ @Override
+ protected Configuration createConf() {
+ return conf;
+ }
+ };
+
+ deletionSvc.init(conf);
+ deletionSvc.start();
+
+ //app1Dir would be deleted since its done above log retention period
+ verify(mockFs, timeout(10000)).delete(app1Dir, true);
+ //app2Dir is not expected to be deleted since its below the threshold
+ verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
+
+ //Now,lets change the confs
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ "2");
+ //We have not called refreshLogSettings,hence don't expect to see the changed conf values
+ Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+
+ //refresh the log settings
+ deletionSvc.refreshLogRetentionSettings();
+
+ //Check interval time should reflect the new value
+ Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+ //app2Dir should be deleted since it falls above the threshold
+ verify(mockFs, timeout(10000)).delete(app2Dir, true);
+ deletionSvc.stop();
+ }
+
+ @Test
public void testCheckInterval() throws Exception {
long RETENTION_SECS = 10 * 24 * 3600;
long now = System.currentTimeMillis();
@@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionSe
new AggregatedLogDeletionService();
deletionSvc.init(conf);
deletionSvc.start();
-
+
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
verify(mockFs, never()).delete(app1Dir, true);
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-yarn-server-common</name>
<properties>
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java Mon Aug 12 21:25:49 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.api.impl.pb.client;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.server.api
import com.google.protobuf.ServiceException;
-public class ResourceTrackerPBClientImpl implements ResourceTracker {
+public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
private ResourceTrackerPB proxy;
@@ -50,7 +51,14 @@ private ResourceTrackerPB proxy;
proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerPB.class, clientVersion, addr, conf);
}
-
+
+ @Override
+ public void close() {
+ if(this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ }
+
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Mon Aug 12 21:25:49 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -232,6 +231,11 @@ public class BuilderUtils {
return newToken(Token.class, identifier, kind, password, service);
}
+ public static Token newAMRMToken(byte[] identifier, String kind,
+ byte[] password, String service) {
+ return newToken(Token.class, identifier, kind, password, service);
+ }
+
@Private
@VisibleForTesting
public static Token newContainerToken(NodeId nodeId,
@@ -308,7 +312,7 @@ public class BuilderUtils {
String url, long startTime, long finishTime,
FinalApplicationStatus finalStatus,
ApplicationResourceUsageReport appResources, String origTrackingUrl,
- float progress, String appType) {
+ float progress, String appType, Token amRmToken) {
ApplicationReport report = recordFactory
.newRecordInstance(ApplicationReport.class);
report.setApplicationId(applicationId);
@@ -329,6 +333,7 @@ public class BuilderUtils {
report.setOriginalTrackingUrl(origTrackingUrl);
report.setProgress(progress);
report.setApplicationType(appType);
+ report.setAMRMToken(amRmToken);
return report;
}
@@ -390,20 +395,6 @@ public class BuilderUtils {
url.setFile(file);
return url;
}
-
- public static AllocateRequest newAllocateRequest(
- ApplicationAttemptId applicationAttemptId, int responseID,
- float appProgress, List<ResourceRequest> resourceAsk,
- List<ContainerId> containersToBeReleased) {
- AllocateRequest allocateRequest = recordFactory
- .newRecordInstance(AllocateRequest.class);
- allocateRequest.setApplicationAttemptId(applicationAttemptId);
- allocateRequest.setResponseId(responseID);
- allocateRequest.setProgress(appProgress);
- allocateRequest.setAskList(resourceAsk);
- allocateRequest.setReleaseList(containersToBeReleased);
- return allocateRequest;
- }
public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers,
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java Mon Aug 12 21:25:49 2013
@@ -22,12 +22,12 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -59,10 +59,4 @@ public class YarnServerBuilderUtils {
}
return response;
}
-
- public static SerializedException newSerializedException(Throwable e) {
- SerializedException se = Records.newRecord(SerializedException.class);
- se.init(e);
- return se;
- }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java Mon Aug 12 21:25:49 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
@@ -45,7 +46,6 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.SerializedExceptionPBImpl;
import org.junit.Test;
/**
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Mon Aug 12 21:25:49 2013
@@ -19,12 +19,12 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.3.0-SNAPSHOT</version>
<name>hadoop-yarn-server-nodemanager</name>
<properties>
@@ -143,6 +143,10 @@
<name>application.submitter</name>
<value>${application.submitter}</value>
</property>
+ <property>
+ <name>yarn.log.dir</name>
+ <value>${maven.project.build.directory}/logs</value>
+ </property>
</systemPropertyVariables>
<excludes>
<exclude>**/TestFSDownload.java</exclude>
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -39,8 +39,10 @@ import org.apache.hadoop.util.Shell.Shel
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
public abstract class ContainerExecutor implements Configurable {
@@ -298,15 +300,16 @@ public abstract class ContainerExecutor
}
public static class DelayedProcessKiller extends Thread {
+ private Container container;
private final String user;
private final String pid;
private final long delay;
private final Signal signal;
private final ContainerExecutor containerExecutor;
- public DelayedProcessKiller(String user, String pid, long delay,
- Signal signal,
- ContainerExecutor containerExecutor) {
+ public DelayedProcessKiller(Container container, String user, String pid,
+ long delay, Signal signal, ContainerExecutor containerExecutor) {
+ this.container = container;
this.user = user;
this.pid = pid;
this.delay = delay;
@@ -323,7 +326,11 @@ public abstract class ContainerExecutor
} catch (InterruptedException e) {
return;
} catch (IOException e) {
- LOG.warn("Exception when killing task " + pid, e);
+ String message = "Exception when user " + user + " killing task " + pid
+ + " in DelayedProcessKiller: " + StringUtils.stringifyException(e);
+ LOG.warn(message);
+ container.handle(new ContainerDiagnosticsUpdateEvent(container
+ .getContainerId(), message));
}
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -203,11 +204,23 @@ public class DefaultContainerExecutor ex
return -1;
}
int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from task is : " + exitCode);
- String message = shExec.getOutput();
- logOutput(message);
- container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
- message));
+ LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
+ // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
+ // terminated/killed forcefully. In all other cases, log the
+ // container-executor's output
+ if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
+ && exitCode != ExitCode.TERMINATED.getExitCode()) {
+ LOG.warn("Exception from container-launch with container ID: "
+ + containerId + " and exit code: " + exitCode , e);
+ logOutput(shExec.getOutput());
+ String diagnostics = "Exception from container-launch: \n"
+ + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+ container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+ diagnostics));
+ } else {
+ container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
+ "Container killed on request. Exit code is " + exitCode));
+ }
return exitCode;
} finally {
; //
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Mon Aug 12 21:25:49 2013
@@ -18,23 +18,30 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DeletionService extends AbstractService {
@@ -42,7 +49,8 @@ public class DeletionService extends Abs
private int debugDelay;
private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched;
- private final FileContext lfs = getLfs();
+ private static final FileContext lfs = getLfs();
+
static final FileContext getLfs() {
try {
return FileContext.getLocalFSFileContext();
@@ -68,11 +76,23 @@ public class DeletionService extends Abs
public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) {
- sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay,
- TimeUnit.SECONDS);
+ if (baseDirs == null || baseDirs.length == 0) {
+ sched.schedule(new FileDeletionTask(this, user, subDir, null),
+ debugDelay, TimeUnit.SECONDS);
+ } else {
+ sched.schedule(
+ new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
+ debugDelay, TimeUnit.SECONDS);
+ }
}
}
-
+
+ public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
+ if (debugDelay != -1) {
+ sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
+ }
+ }
+
@Override
protected void serviceInit(Configuration conf) throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder()
@@ -118,46 +138,184 @@ public class DeletionService extends Abs
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
- private class FileDeletion implements Runnable {
- final String user;
- final Path subDir;
- final Path[] baseDirs;
- FileDeletion(String user, Path subDir, Path[] baseDirs) {
+ public static class FileDeletionTask implements Runnable {
+ private final String user;
+ private final Path subDir;
+ private final List<Path> baseDirs;
+ private final AtomicInteger numberOfPendingPredecessorTasks;
+ private final Set<FileDeletionTask> successorTaskSet;
+ private final DeletionService delService;
+ // By default all tasks will start as success=true; however if any of
+ // the dependent task fails then it will be marked as false in
+ // fileDeletionTaskFinished().
+ private boolean success;
+
+ private FileDeletionTask(DeletionService delService, String user,
+ Path subDir, List<Path> baseDirs) {
+ this.delService = delService;
this.user = user;
this.subDir = subDir;
this.baseDirs = baseDirs;
+ this.successorTaskSet = new HashSet<FileDeletionTask>();
+ this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
+ success = true;
+ }
+
+ /**
+ * increments and returns pending predecessor task count
+ */
+ public int incrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.incrementAndGet();
+ }
+
+ /**
+ * decrements and returns pending predecessor task count
+ */
+ public int decrementAndGetPendingPredecessorTasks() {
+ return numberOfPendingPredecessorTasks.decrementAndGet();
}
+
+ @VisibleForTesting
+ public String getUser() {
+ return this.user;
+ }
+
+ @VisibleForTesting
+ public Path getSubDir() {
+ return this.subDir;
+ }
+
+ @VisibleForTesting
+ public List<Path> getBaseDirs() {
+ return this.baseDirs;
+ }
+
+ public synchronized void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public synchronized boolean getSucess() {
+ return this.success;
+ }
+
@Override
public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this);
+ }
+ boolean error = false;
if (null == user) {
- if (baseDirs == null || baseDirs.length == 0) {
+ if (baseDirs == null || baseDirs.size() == 0) {
LOG.debug("NM deleting absolute path : " + subDir);
try {
lfs.delete(subDir, true);
} catch (IOException e) {
+ error = true;
LOG.warn("Failed to delete " + subDir);
}
- return;
- }
- for (Path baseDir : baseDirs) {
- Path del = subDir == null? baseDir : new Path(baseDir, subDir);
- LOG.debug("NM deleting path : " + del);
- try {
- lfs.delete(del, true);
- } catch (IOException e) {
- LOG.warn("Failed to delete " + subDir);
+ } else {
+ for (Path baseDir : baseDirs) {
+ Path del = subDir == null? baseDir : new Path(baseDir, subDir);
+ LOG.debug("NM deleting path : " + del);
+ try {
+ lfs.delete(del, true);
+ } catch (IOException e) {
+ error = true;
+ LOG.warn("Failed to delete " + subDir);
+ }
}
}
} else {
try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
- exec.deleteAsUser(user, subDir, baseDirs);
+ if (baseDirs == null || baseDirs.size() == 0) {
+ delService.exec.deleteAsUser(user, subDir, (Path[])null);
+ } else {
+ delService.exec.deleteAsUser(user, subDir,
+ baseDirs.toArray(new Path[0]));
+ }
} catch (IOException e) {
+ error = true;
LOG.warn("Failed to delete as user " + user, e);
} catch (InterruptedException e) {
+ error = true;
LOG.warn("Failed to delete as user " + user, e);
}
}
+ if (error) {
+ setSuccess(!error);
+ }
+ fileDeletionTaskFinished();
}
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
+ sb.append(" user : ").append(this.user);
+ sb.append(" subDir : ").append(
+ subDir == null ? "null" : subDir.toString());
+ sb.append(" baseDir : ");
+ if (baseDirs == null || baseDirs.size() == 0) {
+ sb.append("null");
+ } else {
+ for (Path baseDir : baseDirs) {
+ sb.append(baseDir.toString()).append(',');
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * If there is a task dependency between say tasks 1,2,3 such that
+ * task2 and task3 can be started only after task1 then we should define
+ * task2 and task3 as successor tasks for task1.
+ * Note:- Task dependency should be defined prior to
+ * @param successorTask
+ */
+ public synchronized void addFileDeletionTaskDependency(
+ FileDeletionTask successorTask) {
+ if (successorTaskSet.add(successorTask)) {
+ successorTask.incrementAndGetPendingPredecessorTasks();
+ }
+ }
+
+ /*
+ * This is called when
+ * 1) Current file deletion task ran and finished.
+ * 2) This can be even directly called by predecessor task if one of the
+ * dependent tasks of it has failed marking its success = false.
+ */
+ private synchronized void fileDeletionTaskFinished() {
+ Iterator<FileDeletionTask> successorTaskI =
+ this.successorTaskSet.iterator();
+ while (successorTaskI.hasNext()) {
+ FileDeletionTask successorTask = successorTaskI.next();
+ if (!success) {
+ successorTask.setSuccess(success);
+ }
+ int count = successorTask.decrementAndGetPendingPredecessorTasks();
+ if (count == 0) {
+ if (successorTask.getSucess()) {
+ successorTask.delService.scheduleFileDeletionTask(successorTask);
+ } else {
+ successorTask.fileDeletionTaskFinished();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method to create file deletion task. To be used only if we need
+ * a way to define dependencies between deletion tasks.
+ * @param user user on whose behalf this task is suppose to run
+ * @param subDir sub directory as required in
+ * {@link DeletionService#delete(String, Path, Path...)}
+ * @param baseDirs base directories as required in
+ * {@link DeletionService#delete(String, Path, Path...)}
+ */
+ public FileDeletionTask createFileDeletionTask(String user, Path subDir,
+ Path[] baseDirs) {
+ return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
-}
+}
\ No newline at end of file
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Mon Aug 12 21:25:49 2013
@@ -147,7 +147,8 @@ public class LinuxContainerExecutor exte
shExec.execute();
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from container is : " + exitCode);
+ LOG.warn("Exit code from container executor initialization is : "
+ + exitCode, e);
logOutput(shExec.getOutput());
throw new IOException("Linux container executor not configured properly"
+ " (error=" + exitCode + ")", e);
@@ -204,10 +205,11 @@ public class LinuxContainerExecutor exte
}
} catch (ExitCodeException e) {
int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from container is : " + exitCode);
+ LOG.warn("Exit code from container " + locId + " startLocalizer is : "
+ + exitCode, e);
logOutput(shExec.getOutput());
- throw new IOException("App initialization failed (" + exitCode +
- ") with output: " + shExec.getOutput(), e);
+ throw new IOException("Application " + appId + " initialization failed" +
+ " (exitCode=" + exitCode + ") with output: " + shExec.getOutput(), e);
}
}
@@ -256,19 +258,18 @@ public class LinuxContainerExecutor exte
return ExitCode.TERMINATED.getExitCode();
}
} catch (ExitCodeException e) {
-
if (null == shExec) {
return -1;
}
-
int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from container is : " + exitCode);
+ LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the
// container-executor's output
if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
- LOG.warn("Exception from container-launch : ", e);
+ LOG.warn("Exception from container-launch with container ID: "
+ + containerId + " and exit code: " + exitCode , e);
logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
@@ -310,9 +311,12 @@ public class LinuxContainerExecutor exte
if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
return false;
}
+ LOG.warn("Error in signalling container " + pid + " with " + signal
+ + "; exit = " + ret_code, e);
logOutput(shExec.getOutput());
- throw new IOException("Problem signalling container " + pid + " with " +
- signal + "; exit = " + ret_code);
+ throw new IOException("Problem signalling container " + pid + " with "
+ + signal + "; output: " + shExec.getOutput() + " and exitCode: "
+ + ret_code, e);
}
return true;
}
@@ -346,13 +350,10 @@ public class LinuxContainerExecutor exte
}
} catch (IOException e) {
int exitCode = shExec.getExitCode();
- LOG.warn("Exit code from container is : " + exitCode);
- if (exitCode != 0) {
- LOG.error("DeleteAsUser for " + dir.toUri().getPath()
- + " returned with non-zero exit code" + exitCode);
- LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
- logOutput(shExec.getOutput());
- }
+ LOG.error("DeleteAsUser for " + dir.toUri().getPath()
+ + " returned with exit code: " + exitCode, e);
+ LOG.error("Output from LinuxContainerExecutor's deleteAsUser follows:");
+ logOutput(shExec.getOutput());
}
}
@@ -373,9 +374,10 @@ public class LinuxContainerExecutor exte
shExec.execute();
} catch (IOException e) {
int ret_code = shExec.getExitCode();
+ LOG.warn("Exception in LinuxContainerExecutor mountCgroups ", e);
logOutput(shExec.getOutput());
throw new IOException("Problem mounting cgroups " + cgroupKVs +
- "; exit code = " + ret_code, e);
+ "; exit code = " + ret_code + " and output: " + shExec.getOutput(), e);
}
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Aug 12 21:25:49 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
@@ -229,6 +230,15 @@ public class NodeManager extends Composi
return "NodeManager";
}
+ protected void shutDown() {
+ new Thread() {
+ @Override
+ public void run() {
+ NodeManager.this.stop();
+ }
+ }.start();
+ }
+
protected void resyncWithRM() {
//we do not want to block dispatcher thread here
new Thread() {
@@ -265,6 +275,8 @@ public class NodeManager extends Composi
while (!containers.isEmpty()
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
+ //To remove done containers in NM context
+ nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on shutdown",
@@ -276,7 +288,6 @@ public class NodeManager extends Composi
while (!containers.isEmpty()) {
try {
Thread.sleep(1000);
- //to remove done containers from the map
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync",
@@ -409,7 +420,7 @@ public class NodeManager extends Composi
public void handle(NodeManagerEvent event) {
switch (event.getType()) {
case SHUTDOWN:
- stop();
+ shutDown();
break;
case RESYNC:
resyncWithRM();
@@ -446,4 +457,10 @@ public class NodeManager extends Composi
Configuration conf = new YarnConfiguration();
nodeManager.initAndStartNodeManager(conf, false);
}
+
+ @VisibleForTesting
+ @Private
+ public NodeStatusUpdater getNodeStatusUpdater() {
+ return nodeStatusUpdater;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Mon Aug 12 21:25:49 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public interface NodeStatusUpdater extends Service {
@@ -28,4 +29,8 @@ public interface NodeStatusUpdater exten
NodeStatus getNodeStatusAndUpdateContainersInContext();
long getRMIdentifier();
+
+ public boolean isContainerRecentlyStopped(ContainerId containerId);
+
+ public void clearFinishedContainersFromCache();
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Aug 12 21:25:49 2013
@@ -19,11 +19,12 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -33,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,9 +49,9 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -67,6 +69,9 @@ import com.google.common.annotations.Vis
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
+ public static final String YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS =
+ YarnConfiguration.NM_PREFIX + "duration-to-track-stopped-containers";
+
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object();
@@ -77,7 +82,6 @@ public class NodeStatusUpdaterImpl exten
private NodeId nodeId;
private long nextHeartBeatInterval;
private ResourceTracker resourceTracker;
- private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
private volatile boolean isStopped;
@@ -88,12 +92,13 @@ public class NodeStatusUpdaterImpl exten
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
+ // It will be used to track recently stopped containers on node manager.
+ private final Map<ContainerId, Long> recentlyStoppedContainers;
+ // Duration for which to track recently stopped container.
+ private long durationToTrackStoppedContainers;
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
- private long rmConnectWaitMS;
- private long rmConnectionRetryIntervalMS;
- private boolean waitForEver;
private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
@@ -106,15 +111,12 @@ public class NodeStatusUpdaterImpl exten
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
+ this.recentlyStoppedContainers =
+ new LinkedHashMap<ContainerId, Long>();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
- this.rmAddress = conf.getSocketAddr(
- YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
-
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
@@ -137,11 +139,27 @@ public class NodeStatusUpdaterImpl exten
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ // Default duration to track stopped containers on nodemanager is 10Min.
+ // This should not be assigned very large value as it will remember all the
+ // containers stopped during that time.
+ durationToTrackStoppedContainers =
+ conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+ 600000);
+ if (durationToTrackStoppedContainers < 0) {
+ String message = "Invalid configuration for "
+ + YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+ + "value is 10Min(600000).";
+ LOG.error(message);
+ throw new YarnException(message);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+ + durationToTrackStoppedContainers);
+ }
+ super.serviceInit(conf);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
-
- super.serviceInit(conf);
}
@Override
@@ -153,6 +171,7 @@ public class NodeStatusUpdaterImpl exten
try {
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
+ this.resourceTracker = getRMClient();
registerWithRM();
super.serviceStart();
startStatusUpdater();
@@ -167,6 +186,7 @@ public class NodeStatusUpdaterImpl exten
protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
+ stopRMProxy();
super.serviceStop();
}
@@ -188,6 +208,13 @@ public class NodeStatusUpdaterImpl exten
}
}
+ @VisibleForTesting
+ protected void stopRMProxy() {
+ if(this.resourceTracker != null) {
+ RPC.stopProxy(this.resourceTracker);
+ }
+ }
+
@Private
protected boolean isTokenKeepAliveEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
@@ -195,93 +222,22 @@ public class NodeStatusUpdaterImpl exten
&& UserGroupInformation.isSecurityEnabled();
}
- protected ResourceTracker getRMClient() {
+ @VisibleForTesting
+ protected ResourceTracker getRMClient() throws IOException {
Configuration conf = getConfig();
- YarnRPC rpc = YarnRPC.create(conf);
- return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
- conf);
+ return ServerRMProxy.createRMProxy(conf, ResourceTracker.class);
}
@VisibleForTesting
protected void registerWithRM() throws YarnException, IOException {
- Configuration conf = getConfig();
- rmConnectWaitMS =
- conf.getInt(
- YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
- YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
- * 1000;
- rmConnectionRetryIntervalMS =
- conf.getLong(
- YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
- YarnConfiguration
- .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
- * 1000;
-
- if(rmConnectionRetryIntervalMS < 0) {
- throw new YarnRuntimeException("Invalid Configuration. " +
- YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
- " should not be negative.");
- }
-
- waitForEver = (rmConnectWaitMS == -1000);
-
- if(! waitForEver) {
- if(rmConnectWaitMS < 0) {
- throw new YarnRuntimeException("Invalid Configuration. " +
- YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
- " can be -1, but can not be other negative numbers");
- }
-
- //try connect once
- if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
- LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
- + " is smaller than "
- + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
- + ". Only try connect once.");
- rmConnectWaitMS = 0;
- }
- }
-
- int rmRetryCount = 0;
- long waitStartTime = System.currentTimeMillis();
-
RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
- RegisterNodeManagerResponse regNMResponse;
-
- while(true) {
- try {
- rmRetryCount++;
- LOG.info("Connecting to ResourceManager at " + this.rmAddress
- + ". current no. of attempts is " + rmRetryCount);
- this.resourceTracker = getRMClient();
- regNMResponse =
- this.resourceTracker.registerNodeManager(request);
- this.rmIdentifier = regNMResponse.getRMIdentifier();
- break;
- } catch(Throwable e) {
- LOG.warn("Trying to connect to ResourceManager, " +
- "current no. of failed attempts is "+rmRetryCount);
- if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
- || waitForEver) {
- try {
- LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
- + " seconds before next connection retry to RM");
- Thread.sleep(rmConnectionRetryIntervalMS);
- } catch(InterruptedException ex) {
- //done nothing
- }
- } else {
- String errorMessage = "Failed to Connect to RM, " +
- "no. of failed attempts is "+rmRetryCount;
- LOG.error(errorMessage,e);
- throw new YarnRuntimeException(errorMessage,e);
- }
- }
- }
+ RegisterNodeManagerResponse regNMResponse =
+ resourceTracker.registerNodeManager(request);
+ this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
@@ -360,7 +316,11 @@ public class NodeStatusUpdaterImpl exten
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
-
+ // Adding to finished containers cache. Cache will keep it around at
+ // least for #durationToTrackStoppedContainers duration. In the
+ // subsequent call to stop container it will get removed from cache.
+ addStoppedContainersToCache(containerId);
+
LOG.info("Removed completed container " + containerId);
}
}
@@ -410,6 +370,46 @@ public class NodeStatusUpdaterImpl exten
}
}
+ public boolean isContainerRecentlyStopped(ContainerId containerId) {
+ synchronized (recentlyStoppedContainers) {
+ return recentlyStoppedContainers.containsKey(containerId);
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public void addStoppedContainersToCache(ContainerId containerId) {
+ synchronized (recentlyStoppedContainers) {
+ removeVeryOldStoppedContainersFromCache();
+ recentlyStoppedContainers.put(containerId,
+ System.currentTimeMillis() + durationToTrackStoppedContainers);
+ }
+ }
+
+ @Override
+ public void clearFinishedContainersFromCache() {
+ synchronized (recentlyStoppedContainers) {
+ recentlyStoppedContainers.clear();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public void removeVeryOldStoppedContainersFromCache() {
+ synchronized (recentlyStoppedContainers) {
+ long currentTime = System.currentTimeMillis();
+ Iterator<ContainerId> i =
+ recentlyStoppedContainers.keySet().iterator();
+ while (i.hasNext()) {
+ if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+ i.remove();
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
@Override
public long getRMIdentifier() {
return this.rmIdentifier;
@@ -426,8 +426,6 @@ public class NodeStatusUpdaterImpl exten
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- int rmRetryCount = 0;
- long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
@@ -440,31 +438,7 @@ public class NodeStatusUpdaterImpl exten
request
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
- while (!isStopped) {
- try {
- rmRetryCount++;
- response = resourceTracker.nodeHeartbeat(request);
- break;
- } catch (Throwable e) {
- LOG.warn("Trying to heartbeat to ResourceManager, "
- + "current no. of failed attempts is " + rmRetryCount);
- if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
- || waitForEver) {
- try {
- LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
- + " seconds before next heartbeat to RM");
- Thread.sleep(rmConnectionRetryIntervalMS);
- } catch(InterruptedException ex) {
- //done nothing
- }
- } else {
- String errorMessage = "Failed to heartbeat to RM, " +
- "no. of failed attempts is "+rmRetryCount;
- LOG.error(errorMessage,e);
- throw new YarnRuntimeException(errorMessage,e);
- }
- }
- }
+ response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
@@ -481,7 +455,7 @@ public class NodeStatusUpdaterImpl exten
}
if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.warn("Node is out of sync with ResourceManager,"
- + " hence rebooting.");
+ + " hence resyncing.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
// Invalidate the RMIdentifier while resync
@@ -508,12 +482,13 @@ public class NodeStatusUpdaterImpl exten
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
}
- } catch (YarnRuntimeException e) {
+ } catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
- throw e;
+ throw new YarnRuntimeException(e);
} catch (Throwable e) {
+
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
@@ -550,4 +525,6 @@ public class NodeStatusUpdaterImpl exten
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
+
+
}
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalResourceStatus.java Mon Aug 12 21:25:49 2013
@@ -18,8 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
public interface LocalResourceStatus {
public LocalResource getResource();
Modified: hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java?rev=1513258&r1=1513257&r2=1513258&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalResourceStatusPBImpl.java Mon Aug 12 21:25:49 2013
@@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
@@ -28,8 +30,6 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceStatusTypeProto;
-import org.apache.hadoop.yarn.server.api.records.SerializedException;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;