You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/26 17:43:25 UTC
[2/2] hadoop git commit: YARN-3040. Make putEntities operation be
aware of the app's context. Contributed by Zhijie Shen
YARN-3040. Make putEntities operation be aware of the app's context. Contributed by Zhijie Shen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db2f0238
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db2f0238
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db2f0238
Branch: refs/heads/YARN-2928
Commit: db2f0238915d6e1a5b85c463426b5e072bd4698d
Parents: dc12cad
Author: Junping Du <ju...@apache.org>
Authored: Thu Mar 26 09:59:32 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Mar 26 09:59:32 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 1 +
.../applications/distributedshell/Client.java | 27 +++-
.../distributedshell/TestDistributedShell.java | 125 +++++++++-------
.../yarn/util/timeline/TimelineUtils.java | 16 +++
.../api/CollectorNodemanagerProtocol.java | 16 +++
...ollectorNodemanagerProtocolPBClientImpl.java | 20 +++
...llectorNodemanagerProtocolPBServiceImpl.java | 21 +++
.../GetTimelineCollectorContextRequest.java | 37 +++++
.../GetTimelineCollectorContextResponse.java | 46 ++++++
...etTimelineCollectorContextRequestPBImpl.java | 127 +++++++++++++++++
...tTimelineCollectorContextResponsePBImpl.java | 141 +++++++++++++++++++
.../proto/collectornodemanager_protocol.proto | 1 +
.../yarn_server_common_service_protos.proto | 9 ++
.../java/org/apache/hadoop/yarn/TestRPC.java | 39 +++++
.../collectormanager/NMCollectorService.java | 18 ++-
.../containermanager/ContainerManagerImpl.java | 14 +-
.../application/Application.java | 4 +
.../application/ApplicationImpl.java | 17 ++-
.../application/TestApplication.java | 3 +-
.../yarn/server/nodemanager/webapp/MockApp.java | 10 ++
.../nodemanager/webapp/TestNMWebServices.java | 2 +-
.../resourcemanager/amlauncher/AMLauncher.java | 22 ++-
.../timelineservice/RMTimelineCollector.java | 7 +
.../TestTimelineServiceClientIntegration.java | 19 ++-
.../collector/AppLevelTimelineCollector.java | 33 ++++-
.../PerNodeTimelineCollectorsAuxService.java | 2 +-
.../collector/TimelineCollector.java | 19 ++-
.../collector/TimelineCollectorContext.java | 81 +++++++++++
.../collector/TimelineCollectorManager.java | 32 ++++-
.../collector/TimelineCollectorWebService.java | 2 +-
.../storage/FileSystemTimelineWriterImpl.java | 69 +++++----
.../timelineservice/storage/TimelineWriter.java | 9 +-
...TestPerNodeTimelineCollectorsAuxService.java | 43 ++++--
.../collector/TestTimelineCollectorManager.java | 41 +++++-
.../TestFileSystemTimelineWriterImpl.java | 22 ++-
36 files changed, 955 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 111de71..d62bd0e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -41,6 +41,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R
via junping_du)
+ YARN-3040. Make putEntities operation be aware of the app's context. (Zhijie Shen
+ via junping_du)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 13cdcbe..36dcd5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -124,6 +124,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_PREFIX = "yarn.resourcemanager.";
public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
+ public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster";
public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 934515e..e962e71 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
import org.apache.commons.cli.CommandLine;
@@ -183,6 +185,9 @@ public class Client {
// Timeline domain writer access control
private String modifyACLs = null;
+ private String flowId = null;
+ private String flowRunId = null;
+
// Command line options
private Options opts;
@@ -256,7 +261,8 @@ public class Client {
opts.addOption("shell_args", true, "Command line args for the shell script." +
"Multiple args can be separated by empty space.");
opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
- opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+ opts.addOption("shell_env", true,
+ "Environment for shell script. Specified as env_key=env_val pairs");
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
@@ -283,6 +289,10 @@ public class Client {
+ "modify the timeline entities in the given domain");
opts.addOption("create", false, "Flag to indicate whether to create the "
+ "domain specified with -domain.");
+ opts.addOption("flow", true, "ID of the flow which the distributed shell "
+ + "app belongs to");
+ opts.addOption("flow_run", true, "ID of the flowrun which the distributed "
+ + "shell app belongs to");
opts.addOption("help", false, "Print usage");
opts.addOption("node_label_expression", true,
"Node label expression to determine the nodes"
@@ -442,6 +452,12 @@ public class Client {
}
}
+ if (cliParser.hasOption("flow")) {
+ flowId = cliParser.getOptionValue("flow");
+ }
+ if (cliParser.hasOption("flow_run")) {
+ flowRunId = cliParser.getOptionValue("flow_run");
+ }
return true;
}
@@ -533,6 +549,15 @@ public class Client {
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
+ Set<String> tags = new HashSet<String>();
+ if (flowId != null) {
+ tags.add(TimelineUtils.generateFlowIdTag(flowId));
+ }
+ if (flowRunId != null) {
+ tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
+ }
+ appContext.setApplicationTags(tags);
+
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 0af050c..1de3b68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -33,12 +33,14 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -161,20 +164,26 @@ public class TestDistributedShell {
@Test(timeout=90000)
public void testDSShellWithDomain() throws Exception {
- testDSShell(true, "v1");
+ testDSShell(true, "v1", true);
}
@Test(timeout=90000)
public void testDSShellWithoutDomain() throws Exception {
- testDSShell(false, "v1");
+ testDSShell(false, "v1", true);
}
@Test(timeout=90000)
- public void testDSShellWithoutDomainV2() throws Exception {
- testDSShell(false, "v2");
+ public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
+ testDSShell(false, "v2", true);
}
- public void testDSShell(boolean haveDomain, String timelineVersion)
+ @Test(timeout=90000)
+ public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
+ testDSShell(false, "v2", false);
+ }
+
+ public void testDSShell(boolean haveDomain, String timelineVersion,
+ boolean defaultFlow)
throws Exception {
String[] args = {
"--jar",
@@ -212,6 +221,15 @@ public class TestDistributedShell {
};
isTestingTimelineV2 = true;
args = mergeArgs(args, timelineArgs);
+ if (!defaultFlow) {
+ String[] flowArgs = {
+ "--flow",
+ "test_flow_id",
+ "--flow_run",
+ "12345678"
+ };
+ args = mergeArgs(args, flowArgs);
+ }
LOG.info("Setup: Using timeline v2!");
}
@@ -271,7 +289,7 @@ public class TestDistributedShell {
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
- checkTimelineV2(haveDomain, appId);
+ checkTimelineV2(haveDomain, appId, defaultFlow);
}
}
@@ -320,53 +338,58 @@ public class TestDistributedShell {
}
}
- private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
- // For PoC check in /tmp/ YARN-3264
- String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+ private void checkTimelineV2(
+ boolean haveDomain, ApplicationId appId, boolean defaultFlow)
+ throws Exception {
+ // For PoC check in /tmp/timeline_service_data YARN-3264
+ String tmpRoot =
+ FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ + "/entities/";
File tmpRootFolder = new File(tmpRoot);
- Assert.assertTrue(tmpRootFolder.isDirectory());
-
- // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
- String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
-
- File entityFolder = new File(outputDirApp);
- Assert.assertTrue(entityFolder.isDirectory());
-
- // there will be at least one attempt, look for that file
- String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
- + "_000" + appId.getId() + "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- String appAttemptFileName = outputDirApp + appTimestampFileName;
- File appAttemptFile = new File(appAttemptFileName);
- Assert.assertTrue(appAttemptFile.exists());
-
- String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
- File containerFolder = new File(outputDirContainer);
- Assert.assertTrue(containerFolder.isDirectory());
-
- String containerTimestampFileName = "container_"
- + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_01_000002.thist";
- String containerFileName = outputDirContainer + containerTimestampFileName;
- File containerFile = new File(containerFileName);
- Assert.assertTrue(containerFile.exists());
- String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
- + "_";
- deleteAppFiles(new File(outputDirApp), appTimeStamp);
- deleteAppFiles(new File(outputDirContainer), appTimeStamp);
- tmpRootFolder.delete();
- }
-
- private void deleteAppFiles(File rootDir, String appTimeStamp) {
- boolean deleted = false;
- File[] listOfFiles = rootDir.listFiles();
- for (File f1 : listOfFiles) {
- // list all attempts for this app and delete them
- if (f1.getName().contains(appTimeStamp)){
- deleted = f1.delete();
- Assert.assertTrue(deleted);
- }
+ try {
+ Assert.assertTrue(tmpRootFolder.isDirectory());
+
+ // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+ String outputDirApp = tmpRoot +
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+ UserGroupInformation.getCurrentUser().getShortUserName() +
+ (defaultFlow ? "/" +
+ TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
+ "/0/" : "/test_flow_id/12345678/") +
+ appId.toString() + "/DS_APP_ATTEMPT/";
+
+ File entityFolder = new File(outputDirApp);
+ Assert.assertTrue(entityFolder.isDirectory());
+
+ // there will be at least one attempt, look for that file
+ String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+ + "_000" + appId.getId() + "_000001"
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ String appAttemptFileName = outputDirApp + appTimestampFileName;
+ File appAttemptFile = new File(appAttemptFileName);
+ Assert.assertTrue(appAttemptFile.exists());
+
+ String outputDirContainer = tmpRoot +
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+ UserGroupInformation.getCurrentUser().getShortUserName() +
+ (defaultFlow ? "/" +
+ TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
+ "/0/" : "/test_flow_id/12345678/") +
+ appId.toString() + "/DS_CONTAINER/";
+ File containerFolder = new File(outputDirContainer);
+ Assert.assertTrue(containerFolder.isDirectory());
+
+ String containerTimestampFileName = "container_"
+ + appId.getClusterTimestamp() + "_000" + appId.getId()
+ + "_01_000002.thist";
+ String containerFileName = outputDirContainer + containerTimestampFileName;
+ File containerFile = new File(containerFileName);
+ Assert.assertTrue(containerFile.exists());
+ String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ + "_";
+ } finally {
+ FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 02b5eb4..772c92a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
@@ -40,6 +41,9 @@ import org.codehaus.jackson.map.ObjectMapper;
@Evolving
public class TimelineUtils {
+ public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG";
+ public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
+
private static ObjectMapper mapper;
static {
@@ -105,4 +109,16 @@ public class TimelineUtils {
getTimelineTokenServiceAddress(conf);
return SecurityUtil.buildTokenService(timelineServiceAddr);
}
+
+ public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) {
+ return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
+ }
+
+ public static String generateFlowIdTag(String flowId) {
+ return FLOW_ID_TAG_PREFIX + ":" + flowId;
+ }
+
+ public static String generateFlowRunIdTag(String flowRunId) {
+ return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
index 26c121a..d23c04a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
@@ -54,4 +56,18 @@ public interface CollectorNodemanagerProtocol {
ReportNewCollectorInfoRequest request)
throws YarnException, IOException;
+ /**
+ * <p>
+ * The collector needs to get the context information including user, flow
+ * and flow run ID to associate with every incoming put-entity requests.
+ * </p>
+ * @param request the request of getting the aggregator context information of
+ * the given application
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ GetTimelineCollectorContextResponse getTimelineCollectorContext(
+ GetTimelineCollectorContextRequest request)
+ throws YarnException, IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
index 276a540..b9e17f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
@@ -30,11 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
@@ -85,6 +90,21 @@ public class CollectorNodemanagerProtocolPBClientImpl implements
}
@Override
+ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+ GetTimelineCollectorContextRequest request)
+ throws YarnException, IOException {
+ GetTimelineCollectorContextRequestProto requestProto =
+ ((GetTimelineCollectorContextRequestPBImpl) request).getProto();
+ try {
+ return new GetTimelineCollectorContextResponsePBImpl(
+ proxy.getTimelineCollectorContext(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
public void close() {
if (this.proxy != null) {
RPC.stopProxy(this.proxy);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
index 3f42732..21fb270 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
@@ -20,11 +20,16 @@ package org.apache.hadoop.yarn.server.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
@@ -56,4 +61,20 @@ public class CollectorNodemanagerProtocolPBServiceImpl implements
}
}
+ @Override
+ public GetTimelineCollectorContextResponseProto getTimelineCollectorContext(
+ RpcController controller,
+ GetTimelineCollectorContextRequestProto proto) throws ServiceException {
+ GetTimelineCollectorContextRequestPBImpl request =
+ new GetTimelineCollectorContextRequestPBImpl(proto);
+ try {
+ GetTimelineCollectorContextResponse response =
+ real.getTimelineCollectorContext(request);
+ return ((GetTimelineCollectorContextResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
new file mode 100644
index 0000000..604a40b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextRequest {
+
+ public static GetTimelineCollectorContextRequest newInstance(
+ ApplicationId appId) {
+ GetTimelineCollectorContextRequest request =
+ Records.newRecord(GetTimelineCollectorContextRequest.class);
+ request.setApplicationId(appId);
+ return request;
+ }
+
+ public abstract ApplicationId getApplicationId();
+
+ public abstract void setApplicationId(ApplicationId appId);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
new file mode 100644
index 0000000..1558e2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextResponse {
+
+ public static GetTimelineCollectorContextResponse newInstance(
+ String userId, String flowId, String flowRunId) {
+ GetTimelineCollectorContextResponse response =
+ Records.newRecord(GetTimelineCollectorContextResponse.class);
+ response.setUserId(userId);
+ response.setFlowId(flowId);
+ response.setFlowRunId(flowRunId);
+ return response;
+ }
+
+ public abstract String getUserId();
+
+ public abstract void setUserId(String userId);
+
+ public abstract String getFlowId();
+
+ public abstract void setFlowId(String flowId);
+
+ public abstract String getFlowRunId();
+
+ public abstract void setFlowRunId(String flowRunId);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
new file mode 100644
index 0000000..b53b55b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+
+public class GetTimelineCollectorContextRequestPBImpl extends
+ GetTimelineCollectorContextRequest {
+
+ GetTimelineCollectorContextRequestProto
+ proto = GetTimelineCollectorContextRequestProto.getDefaultInstance();
+ GetTimelineCollectorContextRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId appId = null;
+
+ public GetTimelineCollectorContextRequestPBImpl() {
+ builder = GetTimelineCollectorContextRequestProto.newBuilder();
+ }
+
+ public GetTimelineCollectorContextRequestPBImpl(
+ GetTimelineCollectorContextRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetTimelineCollectorContextRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (appId != null) {
+ builder.setAppId(convertToProtoFormat(this.appId));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetTimelineCollectorContextRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ if (this.appId != null) {
+ return this.appId;
+ }
+
+ GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasAppId()) {
+ return null;
+ }
+
+ this.appId = convertFromProtoFormat(p.getAppId());
+ return this.appId;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null)
+ builder.clearAppId();
+ this.appId = appId;
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl)t).getProto();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
new file mode 100644
index 0000000..6dc1f77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+
+public class GetTimelineCollectorContextResponsePBImpl extends
+ GetTimelineCollectorContextResponse {
+
+ GetTimelineCollectorContextResponseProto proto =
+ GetTimelineCollectorContextResponseProto.getDefaultInstance();
+ GetTimelineCollectorContextResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public GetTimelineCollectorContextResponsePBImpl() {
+ builder = GetTimelineCollectorContextResponseProto.newBuilder();
+ }
+
+ public GetTimelineCollectorContextResponsePBImpl(
+ GetTimelineCollectorContextResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public GetTimelineCollectorContextResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetTimelineCollectorContextResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public String getUserId() {
+ GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasUserId()) {
+ return null;
+ }
+ return p.getUserId();
+ }
+
+ @Override
+ public void setUserId(String userId) {
+ maybeInitBuilder();
+ if (userId == null) {
+ builder.clearUserId();
+ return;
+ }
+ builder.setUserId(userId);
+ }
+
+ @Override
+ public String getFlowId() {
+ GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFlowId()) {
+ return null;
+ }
+ return p.getFlowId();
+ }
+
+ @Override
+ public void setFlowId(String flowId) {
+ maybeInitBuilder();
+ if (flowId == null) {
+ builder.clearFlowId();
+ return;
+ }
+ builder.setFlowId(flowId);
+ }
+
+ @Override
+ public String getFlowRunId() {
+ GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFlowRunId()) {
+ return null;
+ }
+ return p.getFlowRunId();
+ }
+
+ @Override
+ public void setFlowRunId(String flowRunId) {
+ maybeInitBuilder();
+ if (flowRunId == null) {
+ builder.clearFlowRunId();
+ return;
+ }
+ builder.setFlowRunId(flowRunId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
index 654a9f2..8665274 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
@@ -26,4 +26,5 @@ import "yarn_server_common_service_protos.proto";
service CollectorNodemanagerProtocolService {
rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
+ rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 0086bae..f733371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -87,6 +87,15 @@ message ReportNewCollectorInfoRequestProto {
message ReportNewCollectorInfoResponseProto {
}
+message GetTimelineCollectorContextRequestProto {
+ optional ApplicationIdProto appId = 1;
+}
+
+message GetTimelineCollectorContextResponseProto {
+ optional string user_id = 1;
+ optional string flow_id = 2;
+ optional string flow_run_id = 3;
+}
message NMContainerStatusProto {
optional ContainerIdProto container_id = 1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index cfc3dc6..3c9f57b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
@@ -166,6 +168,31 @@ public class TestRPC {
Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
}
+ // Verify request with a valid app ID
+ try {
+ GetTimelineCollectorContextRequest request =
+ GetTimelineCollectorContextRequest.newInstance(
+ ApplicationId.newInstance(0, 1));
+ GetTimelineCollectorContextResponse response =
+ proxy.getTimelineCollectorContext(request);
+ Assert.assertEquals("test_user_id", response.getUserId());
+ Assert.assertEquals("test_flow_id", response.getFlowId());
+ Assert.assertEquals("test_flow_run_id", response.getFlowRunId());
+ } catch (YarnException | IOException e) {
+ Assert.fail("RPC call failured is not expected here.");
+ }
+
+ // Verify request with an invalid app ID
+ try {
+ GetTimelineCollectorContextRequest request =
+ GetTimelineCollectorContextRequest.newInstance(
+ ApplicationId.newInstance(0, 2));
+ proxy.getTimelineCollectorContext(request);
+ Assert.fail("RPC call failured is expected here.");
+ } catch (YarnException | IOException e) {
+ Assert.assertTrue(e instanceof YarnException);
+ Assert.assertTrue(e.getMessage().contains("The application is not found."));
+ }
server.stop();
}
@@ -340,6 +367,18 @@ public class TestRPC {
recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
return response;
}
+
+ @Override
+ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+ GetTimelineCollectorContextRequest request)
+ throws YarnException, IOException {
+ if (request.getApplicationId().getId() == 1) {
+ return GetTimelineCollectorContextResponse.newInstance(
+ "test_user_id", "test_flow_id", "test_flow_run_id");
+ } else {
+ throw new YarnException("The application is not found.");
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index 009fa63..6ccea84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -30,13 +30,17 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
public class NMCollectorService extends CompositeService implements
CollectorNodemanagerProtocol {
@@ -93,7 +97,7 @@ public class NMCollectorService extends CompositeService implements
@Override
public ReportNewCollectorInfoResponse reportNewCollectorInfo(
- ReportNewCollectorInfoRequest request) throws IOException {
+ ReportNewCollectorInfoRequest request) throws YarnException, IOException {
List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
Map<ApplicationId, String> newCollectorsMap =
@@ -107,4 +111,16 @@ public class NMCollectorService extends CompositeService implements
return ReportNewCollectorInfoResponse.newInstance();
}
+ @Override
+ public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+ GetTimelineCollectorContextRequest request)
+ throws YarnException, IOException {
+ Application app = context.getApplications().get(request.getApplicationId());
+ if (app == null) {
+ throw new YarnException("Application " + request.getApplicationId() +
+ " doesn't exist on NM.");
+ }
+ return GetTimelineCollectorContextResponse.newInstance(
+ app.getUser(), app.getFlowId(), app.getFlowRunId());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index acac600..6ac15a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
@@ -293,8 +294,9 @@ public class ContainerManagerImpl extends CompositeService implements
}
LOG.info("Recovering application " + appId);
- ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
- creds, context);
+ //TODO: Recover flow and flow run ID
+ ApplicationImpl app = new ApplicationImpl(
+ dispatcher, p.getUser(), null, null, appId, creds, context);
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@@ -849,8 +851,12 @@ public class ContainerManagerImpl extends CompositeService implements
try {
if (!serviceStopped) {
// Create the application
- Application application =
- new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+ String flowId = launchContext.getEnvironment().get(
+ TimelineUtils.FLOW_ID_TAG_PREFIX);
+ String flowRunId = launchContext.getEnvironment().get(
+ TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+ Application application = new ApplicationImpl(
+ dispatcher, user, flowId, flowRunId, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
index b1571e9..decd17d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
@@ -35,4 +35,8 @@ public interface Application extends EventHandler<ApplicationEvent> {
ApplicationState getApplicationState();
+ String getFlowId();
+
+ String getFlowRunId();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 5f84b4f..ceaafe8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -65,6 +65,8 @@ public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
+ final String flowId;
+ final String flowRunId;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
@@ -80,10 +82,13 @@ public class ApplicationImpl implements Application {
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
- public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
- Credentials credentials, Context context) {
+ public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
+ String flowRunId, ApplicationId appId, Credentials credentials,
+ Context context) {
this.dispatcher = dispatcher;
this.user = user;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
@@ -488,4 +493,12 @@ public class ApplicationImpl implements Application {
this.readLock.unlock();
}
}
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public String getFlowRunId() {
+ return flowRunId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 370a207..5303df5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -530,7 +530,8 @@ public class TestApplication {
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
- app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
+ app = new ApplicationImpl(
+ dispatcher, this.user, null, null, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
index 4e13010..35b95ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
@@ -39,6 +39,9 @@ public class MockApp implements Application {
Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
ApplicationState appState;
Application app;
+ String flowId;
+ String flowRunId;
+
public MockApp(int uniqId) {
this("mockUser", 1234, uniqId);
@@ -77,4 +80,11 @@ public class MockApp implements Application {
public void handle(ApplicationEvent event) {}
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public String getFlowRunId() {
+ return flowRunId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 5a89e74..1c7ea54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -327,7 +327,7 @@ public class TestNMWebServices extends JerseyTestBase {
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
- appId, null, nmContext));
+ null, null, appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 0dd9ba1..1a8bb9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* The launch of the AM itself.
@@ -217,7 +218,26 @@ public class AMLauncher implements Runnable {
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
-
+ // Set flow context info
+ for (String tag :
+ rmContext.getRMApps().get(applicationId).getApplicationTags()) {
+ if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") ||
+ tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) {
+ String value = tag.substring(
+ TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1);
+ if (!value.isEmpty()) {
+ environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value);
+ }
+ }
+ if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
+ tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
+ String value = tag.substring(
+ TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
+ if (!value.isEmpty()) {
+ environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value);
+ }
+ }
+ }
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
if (container.getTokens() != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
index 22743d6..4ea7a03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
/**
* This class is responsible for posting application and appattempt lifecycle
@@ -87,6 +88,12 @@ public class RMTimelineCollector extends TimelineCollector {
LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
}
}
+
+ @Override
+ protected TimelineCollectorContext getTimelineEntityContext() {
+ // TODO address in YARN-3390.
+ return null;
+ }
/**
* EventHandler implementation which forward events to SystemMetricsPublisher.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index fab131c..c8b9625 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -20,20 +20,27 @@ package org.apache.hadoop.yarn.server.timelineservice;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+
public class TestTimelineServiceClientIntegration {
private static TimelineCollectorManager collectorManager;
private static PerNodeTimelineCollectorsAuxService auxService;
@@ -86,7 +93,17 @@ public class TestTimelineServiceClientIntegration {
@Override
protected CollectorNodemanagerProtocol getNMCollectorService() {
- return mock(CollectorNodemanagerProtocol.class);
+ CollectorNodemanagerProtocol protocol =
+ mock(CollectorNodemanagerProtocol.class);
+ try {
+ GetTimelineCollectorContextResponse response =
+ GetTimelineCollectorContextResponse.newInstance(null, null, null);
+ when(protocol.getTimelineCollectorContext(any(
+ GetTimelineCollectorContextRequest.class))).thenReturn(response);
+ } catch (YarnException | IOException e) {
+ fail();
+ }
+ return protocol;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 7d59876..60ddde5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -18,9 +18,14 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* Service that handles writes to the timeline service and writes them to the
@@ -31,16 +36,29 @@ import org.apache.hadoop.conf.Configuration;
@Private
@Unstable
public class AppLevelTimelineCollector extends TimelineCollector {
- private final String applicationId;
- // TODO define key metadata such as flow metadata, user, and queue
+ private final ApplicationId appId;
+ private final TimelineCollectorContext context;
- public AppLevelTimelineCollector(String applicationId) {
- super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
- this.applicationId = applicationId;
+ public AppLevelTimelineCollector(ApplicationId appId) {
+ super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
+ Preconditions.checkNotNull(appId, "AppId shouldn't be null");
+ this.appId = appId;
+ context = new TimelineCollectorContext();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID));
+ // Set the default values, which will be updated with an RPC call to get the
+ // context info from NM.
+ // Current user usually is not the app user, but keep this field non-null
+ context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
+ // Use app ID to generate a default flow ID for orphan app
+ context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
+ // Set the flow run ID to 0 if it's an orphan app
+ context.setFlowRunId("0");
+ context.setAppId(appId.toString());
super.serviceInit(conf);
}
@@ -54,4 +72,9 @@ public class AppLevelTimelineCollector extends TimelineCollector {
super.serviceStop();
}
+ @Override
+ protected TimelineCollectorContext getTimelineEntityContext() {
+ return context;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
index 59ecef1..2017d01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -95,7 +95,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
*/
public boolean addApplication(ApplicationId appId) {
AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(appId.toString());
+ new AppLevelTimelineCollector(appId);
return (collectorManager.putIfAbsent(appId, collector)
== collector);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 6e20e69..677feb1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage.
@@ -83,21 +84,24 @@ public abstract class TimelineCollector extends CompositeService {
*
* This method should be reserved for selected critical entities and events.
* For normal voluminous writes one should use the async method
- * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+ * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
*/
- public TimelineWriteResponse postEntities(TimelineEntities entities,
+ public TimelineWriteResponse putEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
- LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+ LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
- return writer.write(entities);
+ TimelineCollectorContext context = getTimelineEntityContext();
+ return writer.write(context.getClusterId(), context.getUserId(),
+ context.getFlowId(), context.getFlowRunId(), context.getAppId(),
+ entities);
}
/**
@@ -111,12 +115,15 @@ public abstract class TimelineCollector extends CompositeService {
* @param entities entities to post
* @param callerUgi the caller UGI
*/
- public void postEntitiesAsync(TimelineEntities entities,
+ public void putEntitiesAsync(TimelineEntities entities,
UserGroupInformation callerUgi) {
// TODO implement
if (LOG.isDebugEnabled()) {
- LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+ LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
}
+
+ protected abstract TimelineCollectorContext getTimelineEntityContext();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
new file mode 100644
index 0000000..c1a10a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+public class TimelineCollectorContext {
+
+ private String clusterId;
+ private String userId;
+ private String flowId;
+ private String flowRunId;
+ private String appId;
+
+ public TimelineCollectorContext() {
+ this(null, null, null, null, null);
+ }
+
+ public TimelineCollectorContext(String clusterId, String userId,
+ String flowId, String flowRunId, String appId) {
+ this.clusterId = clusterId;
+ this.userId = userId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ this.appId = appId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public void setFlowId(String flowId) {
+ this.flowId = flowId;
+ }
+
+ public String getFlowRunId() {
+ return flowRunId;
+ }
+
+ public void setFlowRunId(String flowRunId) {
+ this.flowRunId = flowRunId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 3a4515e..909027e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@@ -102,6 +104,7 @@ public class TimelineCollectorManager extends CompositeService {
@Override
protected void serviceStart() throws Exception {
+ nmCollectorService = getNMCollectorService();
startWebApp();
super.serviceStart();
}
@@ -151,11 +154,11 @@ public class TimelineCollectorManager extends CompositeService {
// Report to NM if a new collector is added.
if (collectorIsNew) {
try {
+ updateTimelineCollectorContext(appId, collector);
reportNewCollectorToNM(appId);
} catch (Exception e) {
- // throw exception here as it cannot be used if failed report to NM
- LOG.error("Failed to report a new collector for application: " + appId +
- " to the NM Collector Service.");
+ // throw exception here as it cannot be used if failed communicate with NM
+ LOG.error("Failed to communicate with NM Collector Service for " + appId);
throw new YarnRuntimeException(e);
}
}
@@ -250,7 +253,6 @@ public class TimelineCollectorManager extends CompositeService {
private void reportNewCollectorToNM(ApplicationId appId)
throws YarnException, IOException {
- this.nmCollectorService = getNMCollectorService();
ReportNewCollectorInfoRequest request =
ReportNewCollectorInfoRequest.newInstance(appId,
this.timelineRestServerBindAddress);
@@ -259,6 +261,28 @@ public class TimelineCollectorManager extends CompositeService {
nmCollectorService.reportNewCollectorInfo(request);
}
+ private void updateTimelineCollectorContext(
+ ApplicationId appId, TimelineCollector collector)
+ throws YarnException, IOException {
+ GetTimelineCollectorContextRequest request =
+ GetTimelineCollectorContextRequest.newInstance(appId);
+ LOG.info("Get timeline collector context for " + appId);
+ GetTimelineCollectorContextResponse response =
+ nmCollectorService.getTimelineCollectorContext(request);
+ String userId = response.getUserId();
+ if (userId != null && !userId.isEmpty()) {
+ collector.getTimelineEntityContext().setUserId(userId);
+ }
+ String flowId = response.getFlowId();
+ if (flowId != null && !flowId.isEmpty()) {
+ collector.getTimelineEntityContext().setFlowId(flowId);
+ }
+ String flowRunId = response.getFlowRunId();
+ if (flowRunId != null && !flowRunId.isEmpty()) {
+ collector.getTimelineEntityContext().setFlowRunId(flowRunId);
+ }
+ }
+
@VisibleForTesting
protected CollectorNodemanagerProtocol getNMCollectorService() {
Configuration conf = getConfig();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db2f0238/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 5adae71..0f51656 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -138,7 +138,7 @@ public class TimelineCollectorWebService {
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
}
- collector.postEntities(entities, callerUgi);
+ collector.putEntities(entities, callerUgi);
return Response.ok().build();
} catch (Exception e) {
LOG.error("Error putting entities", e);