You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ju...@apache.org on 2014/08/06 07:48:32 UTC
svn commit: r1616100 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/c...
Author: junping_du
Date: Wed Aug 6 05:48:32 2014
New Revision: 1616100
URL: http://svn.apache.org/r1616100
Log:
YARN-2298. Move TimelineClient to yarn-common project (Contributed by Zhijie Shen)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1616100&r1=1616099&r2=1616100&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Aug 6 05:48:32 2014
@@ -88,6 +88,9 @@ Release 2.6.0 - UNRELEASED
YARN-2370. Fix comment in o.a.h.y.server.resourcemanager.schedulerAppSchedulingInfo
(Wenwu Peng via junping_du)
+ YARN-2298. Move TimelineClient to yarn-common project (Zhijie Shen via
+ junping_du)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1616100&r1=1616099&r2=1616100&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml Wed Aug 6 05:48:32 2014
@@ -57,14 +57,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml?rev=1616100&r1=1616099&r2=1616100&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml Wed Aug 6 05:48:32 2014
@@ -67,10 +67,18 @@
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities.
+ */
+@Public
+@Unstable
+public abstract class TimelineClient extends AbstractService {
+
+ @Public
+ public static TimelineClient createTimelineClient() {
+ TimelineClient client = new TimelineClientImpl();
+ return client;
+ }
+
+ @Private
+ protected TimelineClient(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * server. It is a blocking API. The method will not return until it gets the
+ * response from the timeline server.
+ * </p>
+ *
+ * @param entities
+ * the collection of {@link TimelineEntity}
+ * @return the error information if the sent entities are not correctly stored
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Get a delegation token so as to be able to talk to the timeline server in a
+ * secure way.
+ * </p>
+ *
+ * @param renewer
+ * Address of the renewer who can renew these tokens when needed by
+ * securely talking to the timeline server
+ * @return a delegation token ({@link Token}) that can be used to talk to the
+ * timeline server
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract Token<TimelineDelegationTokenIdentifier> getDelegationToken(
+ String renewer) throws IOException, YarnException;
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse;
+import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A <code>KerberosAuthenticator</code> subclass that fallback to
+ * {@link TimelineAuthenticationConsts}.
+ */
+@Private
+@Unstable
+public class TimelineAuthenticator extends KerberosAuthenticator {
+
+ private static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ }
+
+ /**
+ * Returns the fallback authenticator if the server does not use Kerberos
+ * SPNEGO HTTP authentication.
+ *
+ * @return a {@link TimelineAuthenticationConsts} instance.
+ */
+ @Override
+ protected Authenticator getFallBackAuthenticator() {
+ return new TimelineAuthenticator();
+ }
+
+ public static void injectDelegationToken(Map<String, String> params,
+ Token<?> dtToken)
+ throws IOException {
+ if (dtToken != null) {
+ params.put(TimelineAuthenticationConsts.DELEGATION_PARAM,
+ dtToken.encodeToUrlString());
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ boolean hasDelegationToken(URL url) {
+ if (url.getQuery() == null) {
+ return false;
+ } else {
+ return url.getQuery().contains(
+ TimelineAuthenticationConsts.DELEGATION_PARAM + "=");
+ }
+ }
+
+ @Override
+ public void authenticate(URL url, AuthenticatedURL.Token token)
+ throws IOException, AuthenticationException {
+ if (!hasDelegationToken(url)) {
+ super.authenticate(url, token);
+ }
+ }
+
+ public static Token<TimelineDelegationTokenIdentifier> getDelegationToken(
+ URL url, AuthenticatedURL.Token token, String renewer) throws IOException {
+ TimelineDelegationTokenOperation op =
+ TimelineDelegationTokenOperation.GETDELEGATIONTOKEN;
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(TimelineAuthenticationConsts.OP_PARAM, op.toString());
+ params.put(TimelineAuthenticationConsts.RENEWER_PARAM, renewer);
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(op.getHttpMethod());
+ TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
+ if (!dtRes.getType().equals(
+ TimelineAuthenticationConsts.DELEGATION_TOKEN_URL)) {
+ throw new IOException("The response content is not expected: "
+ + dtRes.getContent());
+ }
+ String tokenStr = dtRes.getContent().toString();
+ Token<TimelineDelegationTokenIdentifier> dToken =
+ new Token<TimelineDelegationTokenIdentifier>();
+ dToken.decodeFromUrlString(tokenStr);
+ return dToken;
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ public static long renewDelegationToken(URL url,
+ AuthenticatedURL.Token token,
+ Token<TimelineDelegationTokenIdentifier> dToken) throws IOException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(TimelineAuthenticationConsts.OP_PARAM,
+ TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
+ params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
+ dToken.encodeToUrlString());
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(
+ TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.getHttpMethod());
+ TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
+ if (!dtRes.getType().equals(
+ TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME)) {
+ throw new IOException("The response content is not expected: "
+ + dtRes.getContent());
+ }
+ return Long.valueOf(dtRes.getContent().toString());
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ public static void cancelDelegationToken(URL url,
+ AuthenticatedURL.Token token,
+ Token<TimelineDelegationTokenIdentifier> dToken) throws IOException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(TimelineAuthenticationConsts.OP_PARAM,
+ TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
+ params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
+ dToken.encodeToUrlString());
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN
+ .getHttpMethod());
+ validateAndParseResponse(conn);
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ /**
+ * Convenience method that appends parameters an HTTP <code>URL</code>.
+ *
+ * @param url
+ * the url.
+ * @param params
+ * the query string parameters.
+ *
+ * @return a <code>URL</code>
+ *
+ * @throws IOException
+ * thrown if an IO error occurs.
+ */
+ public static URL appendParams(URL url, Map<String, String> params)
+ throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(url);
+ String separator = url.toString().contains("?") ? "&" : "?";
+ for (Map.Entry<String, String> entry : params.entrySet()) {
+ sb.append(separator).append(entry.getKey()).append("=").
+ append(URLEncoder.encode(entry.getValue(), "UTF8"));
+ separator = "&";
+ }
+ return new URL(sb.toString());
+ }
+
+ /**
+ * Validates the response of an <code>HttpURLConnection</code>. If the current
+ * status code is not 200, it will throw an exception with a detail message
+ * using Server side error messages if available. Otherwise,
+ * {@link TimelineDelegationTokenResponse} will be parsed and returned.
+ *
+ * @param conn
+ * the <code>HttpURLConnection</code>.
+ * @return
+ * @throws IOException
+ * thrown if the current status code is not 200 or the JSON response
+ * cannot be parsed correctly
+ */
+ private static TimelineDelegationTokenResponse validateAndParseResponse(
+ HttpURLConnection conn) throws IOException {
+ int status = conn.getResponseCode();
+ JsonNode json = mapper.readTree(conn.getInputStream());
+ if (status == HttpURLConnection.HTTP_OK) {
+ return mapper.readValue(json, TimelineDelegationTokenResponse.class);
+ } else {
+ // If the status code is not 200, some thing wrong should happen at the
+ // server side, the JSON content is going to contain exception details.
+ // We can use the JSON content to reconstruct the exception object.
+ try {
+ String message =
+ json.get(TimelineAuthenticationConsts.ERROR_MESSAGE_JSON)
+ .getTextValue();
+ String exception =
+ json.get(TimelineAuthenticationConsts.ERROR_EXCEPTION_JSON)
+ .getTextValue();
+ String className =
+ json.get(TimelineAuthenticationConsts.ERROR_CLASSNAME_JSON)
+ .getTextValue();
+
+ try {
+ ClassLoader cl = TimelineAuthenticator.class.getClassLoader();
+ Class<?> klass = cl.loadClass(className);
+ Constructor<?> constr = klass.getConstructor(String.class);
+ throw (IOException) constr.newInstance(message);
+ } catch (IOException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new IOException(MessageFormat.format("{0} - {1}", exception,
+ message));
+ }
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof IOException) {
+ throw (IOException) ex.getCause();
+ }
+ throw new IOException(
+ MessageFormat.format("HTTP status [{0}], {1}",
+ status, conn.getResponseMessage()));
+ }
+ }
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+@Private
+@Unstable
+public class TimelineClientImpl extends TimelineClient {
+
+ private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
+ private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
+ private static final String URL_PARAM_USER_NAME = "user.name";
+ private static final Joiner JOINER = Joiner.on("");
+ private static Options opts;
+ static {
+ opts = new Options();
+ opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
+ opts.getOption("put").setArgName("Path to the JSON file");
+ opts.addOption("help", false, "Print usage");
+ }
+
+ private Client client;
+ private URI resURI;
+ private boolean isEnabled;
+ private KerberosAuthenticatedURLConnectionFactory urlFactory;
+
+ public TimelineClientImpl() {
+ super(TimelineClientImpl.class.getName());
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ urlFactory = new KerberosAuthenticatedURLConnectionFactory();
+ client = new Client(new URLConnectionClientHandler(urlFactory), cc);
+ } else {
+ client = new Client(new URLConnectionClientHandler(
+ new PseudoAuthenticatedURLConnectionFactory()), cc);
+ }
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ isEnabled = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+ if (!isEnabled) {
+ LOG.info("Timeline service is not enabled");
+ } else {
+ if (YarnConfiguration.useHttps(conf)) {
+ resURI = URI
+ .create(JOINER.join("https://", conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
+ RESOURCE_URI_STR));
+ } else {
+ resURI = URI.create(JOINER.join("http://", conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
+ RESOURCE_URI_STR));
+ }
+ if (UserGroupInformation.isSecurityEnabled()) {
+ urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf));
+ }
+ LOG.info("Timeline service address: " + resURI);
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException {
+ if (!isEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Nothing will be put because timeline service is not enabled");
+ }
+ return new TimelinePutResponse();
+ }
+ TimelineEntities entitiesContainer = new TimelineEntities();
+ entitiesContainer.addEntities(Arrays.asList(entities));
+ ClientResponse resp;
+ try {
+ resp = doPostingEntities(entitiesContainer);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw re;
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ return resp.getEntity(TimelinePutResponse.class);
+ }
+
+ @Override
+ public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
+ String renewer) throws IOException, YarnException {
+ return TimelineAuthenticator.getDelegationToken(resURI.toURL(),
+ urlFactory.token, renewer);
+ }
+
+ @Private
+ @VisibleForTesting
+ public ClientResponse doPostingEntities(TimelineEntities entities) {
+ WebResource webResource = client.resource(resURI);
+ return webResource.accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class, entities);
+ }
+
+ private static class PseudoAuthenticatedURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(URL_PARAM_USER_NAME,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ url = TimelineAuthenticator.appendParams(url, params);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL with delegation token: " + url);
+ }
+ return (HttpURLConnection) url.openConnection();
+ }
+
+ }
+ private static class KerberosAuthenticatedURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ private AuthenticatedURL.Token token;
+ private TimelineAuthenticator authenticator;
+ private Token<TimelineDelegationTokenIdentifier> dToken;
+ private Text service;
+
+ public KerberosAuthenticatedURLConnectionFactory() {
+ token = new AuthenticatedURL.Token();
+ authenticator = new TimelineAuthenticator();
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ try {
+ if (dToken == null) {
+ //TODO: need to take care of the renew case
+ dToken = selectToken();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Timeline delegation token: " + dToken.toString());
+ }
+ }
+ if (dToken != null) {
+ Map<String, String> params = new HashMap<String, String>();
+ TimelineAuthenticator.injectDelegationToken(params, dToken);
+ url = TimelineAuthenticator.appendParams(url, params);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL with delegation token: " + url);
+ }
+ }
+ return new AuthenticatedURL(authenticator).openConnection(url, token);
+ } catch (AuthenticationException e) {
+ LOG.error("Authentication failed when openning connection [" + url
+ + "] with token [" + token + "].", e);
+ throw new IOException(e);
+ }
+ }
+
+ private Token<TimelineDelegationTokenIdentifier> selectToken() {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ String msg = "Error when getting the current user";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ TimelineDelegationTokenSelector tokenSelector =
+ new TimelineDelegationTokenSelector();
+ return tokenSelector.selectToken(
+ service, ugi.getCredentials().getAllTokens());
+ }
+
+ public void setService(Text service) {
+ this.service = service;
+ }
+
+ }
+
+ public static void main(String[] argv) throws Exception {
+ CommandLine cliParser = new GnuParser().parse(opts, argv);
+ if (cliParser.hasOption("put")) {
+ String path = cliParser.getOptionValue("put");
+ if (path != null && path.length() > 0) {
+ putTimelineEntitiesInJSONFile(path);
+ return;
+ }
+ }
+ printUsage();
+ }
+
+ /**
+ * Put timeline data in a JSON file via command line.
+ *
+ * @param path
+ * path to the {@link TimelineEntities} JSON file
+ */
+ private static void putTimelineEntitiesInJSONFile(String path) {
+ File jsonFile = new File(path);
+ if (!jsonFile.exists()) {
+ System.out.println("Error: File [" + jsonFile.getAbsolutePath()
+ + "] doesn't exist");
+ return;
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ TimelineEntities entities = null;
+ try {
+ entities = mapper.readValue(jsonFile, TimelineEntities.class);
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace(System.err);
+ return;
+ }
+ Configuration conf = new YarnConfiguration();
+ TimelineClient client = TimelineClient.createTimelineClient();
+ client.init(conf);
+ client.start();
+ try {
+ if (UserGroupInformation.isSecurityEnabled()
+ && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
+ Token<TimelineDelegationTokenIdentifier> token =
+ client.getDelegationToken(
+ UserGroupInformation.getCurrentUser().getUserName());
+ UserGroupInformation.getCurrentUser().addToken(token);
+ }
+ TimelinePutResponse response = client.putEntities(
+ entities.getEntities().toArray(
+ new TimelineEntity[entities.getEntities().size()]));
+ if (response.getErrors().size() == 0) {
+ System.out.println("Timeline data is successfully put");
+ } else {
+ for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
+ System.out.println("TimelineEntity [" + error.getEntityType() + ":" +
+ error.getEntityId() + "] is not successfully put. Error code: " +
+ error.getErrorCode());
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace(System.err);
+ } finally {
+ client.stop();
+ }
+ }
+
+ /**
+ * Helper function to print out usage
+ */
+ private static void printUsage() {
+ new HelpFormatter().printHelp("TimelineClient", opts);
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.net.URL;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineAuthenticator {
+
+ @Test
+ public void testHasDelegationTokens() throws Exception {
+ TimelineAuthenticator authenticator = new TimelineAuthenticator();
+ Assert.assertFalse(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource")));
+ Assert.assertFalse(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?other=xxxx")));
+ Assert.assertTrue(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?delegation=yyyy")));
+ Assert.assertTrue(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?other=xxxx&delegation=yyyy")));
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java?rev=1616100&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java Wed Aug 6 05:48:32 2014
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.net.ConnectException;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClient {
+
+ private TimelineClientImpl client;
+
+ @Before
+ public void setup() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ client = createTimelineClient(conf);
+ }
+
+ @After
+ public void tearDown() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ mockClientResponse(client, ClientResponse.Status.OK, false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesWithError() throws Exception {
+ mockClientResponse(client, ClientResponse.Status.OK, true, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(1, response.getErrors().size());
+ Assert.assertEquals("test entity id", response.getErrors().get(0)
+ .getEntityId());
+ Assert.assertEquals("test entity type", response.getErrors().get(0)
+ .getEntityType());
+ Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION,
+ response.getErrors().get(0).getErrorCode());
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesNoResponse() throws Exception {
+ mockClientResponse(
+ client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ try {
+ client.putEntities(generateEntity());
+ Assert.fail("Exception is expected");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Failed to get the response from the timeline server."));
+ }
+ }
+
+ @Test
+ public void testPostEntitiesConnectionRefused() throws Exception {
+ mockClientResponse(client, null, false, true);
+ try {
+ client.putEntities(generateEntity());
+ Assert.fail("RuntimeException is expected");
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re instanceof ClientHandlerException);
+ }
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceNotEnabled() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+ TimelineClientImpl client = createTimelineClient(conf);
+ mockClientResponse(
+ client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert.fail(
+ "putEntities should already return before throwing the exception");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceDefaultNotEnabled()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ // Unset the timeline service's enabled properties.
+ // Make sure default value is pickup up
+ conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
+ TimelineClientImpl client = createTimelineClient(conf);
+ mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
+ false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert
+ .fail("putEntities should already return before throwing the exception");
+ }
+ }
+
+ private static ClientResponse mockClientResponse(TimelineClientImpl client,
+ ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) {
+ ClientResponse response = mock(ClientResponse.class);
+ if (hasRuntimeError) {
+ doThrow(new ClientHandlerException(new ConnectException())).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ return response;
+ }
+ doReturn(response).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ when(response.getClientResponseStatus()).thenReturn(status);
+ TimelinePutResponse.TimelinePutError error =
+ new TimelinePutResponse.TimelinePutError();
+ error.setEntityId("test entity id");
+ error.setEntityType("test entity type");
+ error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION);
+ TimelinePutResponse putResponse = new TimelinePutResponse();
+ if (hasError) {
+ putResponse.addError(error);
+ }
+ when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse);
+ return response;
+ }
+
+ private static TimelineEntity generateEntity() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId("entity id");
+ entity.setEntityType("entity type");
+ entity.setStartTime(System.currentTimeMillis());
+ for (int i = 0; i < 2; ++i) {
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.setEventType("test event type " + i);
+ event.addEventInfo("key1", "val1");
+ event.addEventInfo("key2", "val2");
+ entity.addEvent(event);
+ }
+ entity.addRelatedEntity("test ref type 1", "test ref id 1");
+ entity.addRelatedEntity("test ref type 2", "test ref id 2");
+ entity.addPrimaryFilter("pkey1", "pval1");
+ entity.addPrimaryFilter("pkey2", "pval2");
+ entity.addOtherInfo("okey1", "oval1");
+ entity.addOtherInfo("okey2", "oval2");
+ return entity;
+ }
+
+ private static TimelineClientImpl createTimelineClient(
+ YarnConfiguration conf) {
+ TimelineClientImpl client =
+ spy((TimelineClientImpl) TimelineClient.createTimelineClient());
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
+}