You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/02/25 11:26:54 UTC
hadoop git commit: YARN-3240. Implement client API to put generic
entities. Contributed by Zhijie Shen
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 fdff5d262 -> 4487da249
YARN-3240. Implement client API to put generic entities. Contributed by Zhijie Shen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4487da24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4487da24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4487da24
Branch: refs/heads/YARN-2928
Commit: 4487da249f448d5c67b712cd0aa723e764eed77d
Parents: fdff5d2
Author: Junping Du <ju...@apache.org>
Authored: Wed Feb 25 02:40:55 2015 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Wed Feb 25 02:40:55 2015 -0800
----------------------------------------------------------------------
hadoop-project/pom.xml | 7 ++
hadoop-yarn-project/CHANGES.txt | 3 +
.../timelineservice/TimelineEntities.java | 58 ++++++++++
.../hadoop-yarn/hadoop-yarn-common/pom.xml | 1 +
.../hadoop/yarn/client/api/TimelineClient.java | 58 +++++++++-
.../client/api/impl/TimelineClientImpl.java | 109 ++++++++++++++++---
.../TestTimelineServiceRecords.java | 7 ++
.../hadoop-yarn-server-tests/pom.xml | 12 ++
.../TestTimelineServiceClientIntegration.java | 54 +++++++++
.../hadoop-yarn-server-timelineservice/pom.xml | 11 ++
.../aggregator/BaseAggregatorService.java | 7 +-
.../aggregator/PerNodeAggregatorServer.java | 9 +-
.../aggregator/PerNodeAggregatorWebService.java | 54 +++++----
13 files changed, 342 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 5aaccbc..7c09722 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -288,6 +288,13 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b497dd7..2378dc9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -14,6 +14,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3041. Added the overall data model of timeline service next gen.
(zjshen)
+ YARN-3240. Implement client API to put generic entities. (Zhijie Shen via
+ junping_du)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
new file mode 100644
index 0000000..39504cc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashSet;
+import java.util.Set;
+
+@XmlRootElement(name = "entities")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineEntities {
+
+ private Set<TimelineEntity> entities = new HashSet<>();
+
+ public TimelineEntities() {
+
+ }
+
+ @XmlElement(name = "entities")
+ public Set<TimelineEntity> getEntities() {
+ return entities;
+ }
+
+ public void setEntities(Set<TimelineEntity> entities) {
+ this.entities = entities;
+ }
+
+ public void addEntities(Set<TimelineEntity> entities) {
+ this.entities.addAll(entities);
+ }
+
+ public void addEntity(TimelineEntity entity) {
+ entities.add(entity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index 6bc5b71..6357c2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -207,6 +207,7 @@
<exclude>src/main/resources/webapps/jobhistory/.keep</exclude>
<exclude>src/main/resources/webapps/yarn/.keep</exclude>
<exclude>src/main/resources/webapps/applicationhistory/.keep</exclude>
+ <exclude>src/main/resources/webapps/timeline/.keep</exclude>
<exclude>src/main/resources/webapps/cluster/.keep</exclude>
<exclude>src/main/resources/webapps/test/.keep</exclude>
<exclude>src/main/resources/webapps/proxy/.keep</exclude>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index 0313f9e..d40ad7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
@@ -40,15 +41,25 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@Unstable
public abstract class TimelineClient extends AbstractService {
+ protected ApplicationId contextAppId;
+ protected String timelineServiceAddress;
+
@Public
public static TimelineClient createTimelineClient() {
TimelineClient client = new TimelineClientImpl();
return client;
}
+ @Public
+ public static TimelineClient createTimelineClient(ApplicationId appId) {
+ TimelineClient client = new TimelineClientImpl(appId);
+ return client;
+ }
+
@Private
- protected TimelineClient(String name) {
+ protected TimelineClient(String name, ApplicationId appId) {
super(name);
+ contextAppId = appId;
}
/**
@@ -132,4 +143,49 @@ public abstract class TimelineClient extends AbstractService {
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * aggregator. It is a blocking API. The method will not return until all the
+ * put entities have been persisted.
+ * </p>
+ *
+ * @param entities
+ * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putEntities(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * aggregator. It is an asynchronous API. The method will return once all the
+ * entities are received.
+ * </p>
+ *
+ * @param entities
+ * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putEntitiesAsync(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Update the timeline service address where the request will be sent to
+ * </p>
+ * @param address
+ * the timeline service address
+ */
+ public void setTimelineServiceAddress(String address) {
+ timelineServiceAddress = address;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index af68492..d5faaac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -35,6 +35,7 @@ import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -54,6 +55,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -80,13 +82,16 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
@Private
@Unstable
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
- private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
+ private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
+ private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
private static final Joiner JOINER = Joiner.on("");
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
@@ -107,7 +112,6 @@ public class TimelineClientImpl extends TimelineClient {
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
- private URI resURI;
@Private
@VisibleForTesting
@@ -248,7 +252,11 @@ public class TimelineClientImpl extends TimelineClient {
}
public TimelineClientImpl() {
- super(TimelineClientImpl.class.getName());
+ super(TimelineClientImpl.class.getName(), null);
+ }
+
+ public TimelineClientImpl(ApplicationId applicationId) {
+ super(TimelineClientImpl.class.getName(), applicationId);
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -270,18 +278,15 @@ public class TimelineClientImpl extends TimelineClient {
client.addFilter(retryFilter);
if (YarnConfiguration.useHttps(conf)) {
- resURI = URI
- .create(JOINER.join("https://", conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
- RESOURCE_URI_STR));
+ timelineServiceAddress = conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
- resURI = URI.create(JOINER.join("http://", conf.get(
+ timelineServiceAddress = conf.get(
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
- RESOURCE_URI_STR));
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
- LOG.info("Timeline service address: " + resURI);
+ LOG.info("Timeline service address: " + timelineServiceAddress);
super.serviceInit(conf);
}
@@ -294,6 +299,39 @@ public class TimelineClientImpl extends TimelineClient {
return resp.getEntity(TimelinePutResponse.class);
}
+ @Override
+ public void putEntities(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ putEntities(false, entities);
+ }
+
+ @Override
+ public void putEntitiesAsync(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ putEntities(true, entities);
+ }
+
+ private void putEntities(boolean async,
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+ entitiesContainer =
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
+ for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
+ entitiesContainer.addEntity(entity);
+ }
+ MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+ if (contextAppId != null) {
+ params.add("appid", contextAppId.toString());
+ }
+ if (async) {
+ params.add("async", Boolean.TRUE.toString());
+ }
+ putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
+ "entities", params, entitiesContainer);
+ }
@Override
public void putDomain(TimelineDomain domain) throws IOException,
@@ -301,6 +339,36 @@ public class TimelineClientImpl extends TimelineClient {
doPosting(domain, "domain");
}
+ private void putObjects(
+ URI base, String path, MultivaluedMap<String, String> params, Object obj)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = client.resource(base).path(path).queryParams(params)
+ .accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, obj);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw new IOException(re);
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response:\n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ }
+
private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException {
ClientResponse resp;
try {
@@ -346,7 +414,8 @@ public class TimelineClientImpl extends TimelineClient {
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
return (Token) authUrl.getDelegationToken(
- resURI.toURL(), token, renewer, doAsUser);
+ constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
+ token, renewer, doAsUser);
}
};
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
@@ -380,7 +449,7 @@ public class TimelineClientImpl extends TimelineClient {
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
final URI serviceURI = new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
@@ -416,7 +485,7 @@ public class TimelineClientImpl extends TimelineClient {
new DelegationTokenAuthenticatedURL(authenticator,
connConfigurator);
final URI serviceURI = new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
@@ -461,7 +530,8 @@ public class TimelineClientImpl extends TimelineClient {
@Private
@VisibleForTesting
public ClientResponse doPostingObject(Object object, String path) {
- WebResource webResource = client.resource(resURI);
+ WebResource webResource = client.resource(
+ constructResURI(getConfig(), timelineServiceAddress, false));
if (path == null) {
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
@@ -557,6 +627,13 @@ public class TimelineClientImpl extends TimelineClient {
connection.setReadTimeout(socketTimeout);
}
+ private static URI constructResURI(
+ Configuration conf, String address, boolean v2) {
+ return URI.create(
+ JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
+ address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
+ }
+
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 6bab239..4f8ab94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -76,6 +76,13 @@ public class TestTimelineServiceRecords {
entity.addIsRelatedToEntity("test type 4", "test id 4");
entity.addIsRelatedToEntity("test type 5", "test id 5");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true));
+
+ TimelineEntities entities = new TimelineEntities();
+ TimelineEntity entity1 = new TimelineEntity();
+ entities.addEntity(entity1);
+ TimelineEntity entity2 = new TimelineEntity();
+ entities.addEntity(entity2);
+ LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index 2ac274d..ae5efa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -88,6 +88,18 @@
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ </dependency>
+ <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
new file mode 100644
index 0000000..a5159a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn.server.timelineservice;
+
+
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class TestTimelineServiceClientIntegration {
+ private static PerNodeAggregatorServer server;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ try {
+ server = PerNodeAggregatorServer.launchServer(new String[0]);
+ server.addApplication(ApplicationId.newInstance(0, 1));
+ } catch (ExitUtil.ExitException e) {
+ fail();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testPutEntities() throws Exception {
+ TimelineClient client =
+ TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+ try {
+ client.init(new YarnConfiguration());
+ client.start();
+ TimelineEntity entity = new TimelineEntity();
+ entity.setType("test entity type");
+ entity.setId("test entity id");
+ client.putEntities(entity);
+ client.putEntitiesAsync(entity);
+ } catch(Exception e) {
+ fail();
+ } finally {
+ client.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 3154ca3..26790f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -114,6 +114,17 @@
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
index 994c66f..46e5574 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
@@ -25,8 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
/**
* Service that handles writes to the timeline service and writes them to the
@@ -70,16 +69,14 @@ public class BaseAggregatorService extends CompositeService {
*
* @param entities entities to post
* @param callerUgi the caller UGI
- * @return the response that contains the result of the post.
*/
- public TimelinePutResponse postEntities(TimelineEntities entities,
+ public void postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) {
// TODO implement
if (LOG.isDebugEnabled()) {
LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
index 6371e82..ef30b22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.nio.ByteBuffer;
+import com.google.inject.Inject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -39,9 +40,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.WebApp;
-import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.*;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -120,6 +119,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
extends WebApp implements YarnWebParams {
@Override
public void setup() {
+ bind(YarnJacksonJaxbJsonProvider.class);
+ bind(GenericExceptionHandler.class);
bind(PerNodeAggregatorWebService.class);
// bind to the global singleton
bind(AppLevelServiceManager.class).
@@ -214,7 +215,7 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
}
@VisibleForTesting
- static PerNodeAggregatorServer launchServer(String[] args) {
+ public static PerNodeAggregatorServer launchServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4487da24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
index 2d96699..28e6a52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
@@ -20,12 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -40,14 +35,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import java.net.URI;
+
/**
* The main per-node REST end point for timeline service writes. It is
* essentially a container service that routes requests to the appropriate
@@ -112,11 +110,14 @@ public class PerNodeAggregatorWebService {
* the request to the app level aggregator. It expects an application as a
* context.
*/
- @POST
+ @PUT
+ @Path("/entities")
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public TimelinePutResponse postEntities(
+ public Response putEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @QueryParam("async") String async,
+ @QueryParam("appid") String appId,
TimelineEntities entities) {
init(res);
UserGroupInformation callerUgi = getUser(req);
@@ -127,13 +128,20 @@ public class PerNodeAggregatorWebService {
}
// TODO how to express async posts and handle them
+ boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
try {
- AppLevelAggregatorService service = getAggregatorService(req);
+ appId = parseApplicationId(appId);
+ if (appId == null) {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ AppLevelAggregatorService service = serviceManager.getService(appId);
if (service == null) {
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
}
- return service.postEntities(entities, callerUgi);
+ service.postEntities(entities, callerUgi);
+ return Response.ok().build();
} catch (Exception e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
@@ -141,16 +149,18 @@ public class PerNodeAggregatorWebService {
}
}
- private AppLevelAggregatorService
- getAggregatorService(HttpServletRequest req) {
- String appIdString = getApplicationId(req);
- return serviceManager.getService(appIdString);
- }
-
- private String getApplicationId(HttpServletRequest req) {
- // TODO the application id from the request
- // (most likely from the URI)
- return null;
+ private String parseApplicationId(String appId) {
+ // Make sure the appId is not null and is valid
+ ApplicationId appID;
+ try {
+ if (appId != null) {
+ return ConverterUtils.toApplicationId(appId.trim()).toString();
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ return null;
+ }
}
private void init(HttpServletResponse response) {