You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2017/02/16 19:41:26 UTC
[1/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and
TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed
by Naganarasimha G R.
Repository: hadoop
Updated Branches:
refs/heads/trunk 5690b51ef -> 4fa1afdb8
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
new file mode 100644
index 0000000..b5b5f77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientRequest;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+/**
+ * Utility Connector class which is used by timeline clients to securely get
+ * connected to the timeline server.
+ *
+ */
+public class TimelineConnector extends AbstractService {
+
+ private static final Joiner JOINER = Joiner.on("");
+ private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
+ public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+
+ private SSLFactory sslFactory;
+ private Client client;
+ private ConnectionConfigurator connConfigurator;
+ private DelegationTokenAuthenticator authenticator;
+ private DelegationTokenAuthenticatedURL.Token token;
+ private UserGroupInformation authUgi;
+ private String doAsUser;
+ @VisibleForTesting
+ TimelineClientConnectionRetry connectionRetry;
+ private boolean requireConnectionRetry;
+
+ public TimelineConnector(boolean requireConnectionRetry,
+ UserGroupInformation authUgi, String doAsUser,
+ DelegationTokenAuthenticatedURL.Token token) {
+ super("TimelineConnector");
+ this.requireConnectionRetry = requireConnectionRetry;
+ this.authUgi = authUgi;
+ this.doAsUser = doAsUser;
+ this.token = token;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+
+ sslFactory = getSSLFactory(conf);
+ connConfigurator = getConnConfigurator(sslFactory);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ authenticator = new KerberosDelegationTokenAuthenticator();
+ } else {
+ authenticator = new PseudoDelegationTokenAuthenticator();
+ }
+ authenticator.setConnectionConfigurator(connConfigurator);
+
+ connectionRetry = new TimelineClientConnectionRetry(conf);
+ client =
+ new Client(
+ new URLConnectionClientHandler(new TimelineURLConnectionFactory(
+ authUgi, authenticator, connConfigurator, token, doAsUser)),
+ cc);
+ if (requireConnectionRetry) {
+ TimelineJerseyRetryFilter retryFilter =
+ new TimelineJerseyRetryFilter(connectionRetry);
+ client.addFilter(retryFilter);
+ }
+ }
+
+ private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
+ = new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+ return conn;
+ }
+ };
+
+ private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
+ try {
+ return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
+ } catch (Exception e) {
+ LOG.debug("Cannot load customized ssl related configuration. "
+ + "Fallback to system-generic settings.", e);
+ return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+ }
+ }
+
+ private static ConnectionConfigurator initSslConnConfigurator(
+ final int timeout, SSLFactory sslFactory)
+ throws IOException, GeneralSecurityException {
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ sf = sslFactory.createSSLSocketFactory();
+ hv = sslFactory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection c = (HttpsURLConnection) conn;
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ }
+ setTimeouts(conn, timeout);
+ return conn;
+ }
+ };
+ }
+
+ protected SSLFactory getSSLFactory(Configuration conf)
+ throws GeneralSecurityException, IOException {
+ SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ newSSLFactory.init();
+ return newSSLFactory;
+ }
+
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
+ connection.setConnectTimeout(socketTimeout);
+ connection.setReadTimeout(socketTimeout);
+ }
+
+ public static URI constructResURI(Configuration conf, String address,
+ String uri) {
+ return URI.create(
+ JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
+ address, uri));
+ }
+
+ DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
+ return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+ }
+
+ protected void serviceStop() {
+ if (this.sslFactory != null) {
+ this.sslFactory.destroy();
+ }
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ public Object operateDelegationToken(
+ final PrivilegedExceptionAction<?> action)
+ throws IOException, YarnException {
+ // Set up the retry operation
+ TimelineClientRetryOp tokenRetryOp =
+ createRetryOpForOperateDelegationToken(action);
+
+ return connectionRetry.retryOn(tokenRetryOp);
+ }
+
+ @Private
+ @VisibleForTesting
+ TimelineClientRetryOp createRetryOpForOperateDelegationToken(
+ final PrivilegedExceptionAction<?> action) throws IOException {
+ return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
+ action);
+ }
+
+ /**
+ * Abstract class for an operation that should be retried by timeline client.
+ */
+ @Private
+ @VisibleForTesting
+ public static abstract class TimelineClientRetryOp {
+ // The operation that should be retried
+ public abstract Object run() throws IOException;
+
+ // The method to indicate if we should retry given the incoming exception
+ public abstract boolean shouldRetryOn(Exception e);
+ }
+
+ private static class TimelineURLConnectionFactory
+ implements HttpURLConnectionFactory {
+ private DelegationTokenAuthenticator authenticator;
+ private UserGroupInformation authUgi;
+ private ConnectionConfigurator connConfigurator;
+ private Token token;
+ private String doAsUser;
+
+ public TimelineURLConnectionFactory(UserGroupInformation authUgi,
+ DelegationTokenAuthenticator authenticator,
+ ConnectionConfigurator connConfigurator,
+ DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
+ this.authUgi = authUgi;
+ this.authenticator = authenticator;
+ this.connConfigurator = connConfigurator;
+ this.token = token;
+ this.doAsUser = doAsUser;
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url)
+ throws IOException {
+ authUgi.checkTGTAndReloginFromKeytab();
+ try {
+ return new DelegationTokenAuthenticatedURL(authenticator,
+ connConfigurator).openConnection(url, token, doAsUser);
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (AuthenticationException ae) {
+ throw new IOException(ae);
+ }
+ }
+
+ }
+
+ // Class to handle retry
+ // Outside this class, only visible to tests
+ @Private
+ @VisibleForTesting
+ static class TimelineClientConnectionRetry {
+
+ // maxRetries < 0 means keep trying
+ @Private
+ @VisibleForTesting
+ public int maxRetries;
+
+ @Private
+ @VisibleForTesting
+ public long retryInterval;
+
+ // Indicates if retries happened last time. Only tests should read it.
+ // In unit tests, retryOn() calls should _not_ be concurrent.
+ private boolean retried = false;
+
+ @Private
+ @VisibleForTesting
+ boolean getRetired() {
+ return retried;
+ }
+
+ // Constructor with default retry settings
+ public TimelineClientConnectionRetry(Configuration conf) {
+ Preconditions.checkArgument(
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
+ >= -1,
+ "%s property value should be greater than or equal to -1",
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ Preconditions.checkArgument(
+ conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.
+ DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
+ "%s property value should be greater than zero",
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ maxRetries =
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ retryInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ }
+
+ public Object retryOn(TimelineClientRetryOp op)
+ throws RuntimeException, IOException {
+ int leftRetries = maxRetries;
+ retried = false;
+
+ // keep trying
+ while (true) {
+ try {
+ // try perform the op, if fail, keep retrying
+ return op.run();
+ } catch (IOException | RuntimeException e) {
+ // break if there's no retries left
+ if (leftRetries == 0) {
+ break;
+ }
+ if (op.shouldRetryOn(e)) {
+ logException(e, leftRetries);
+ } else {
+ throw e;
+ }
+ }
+ if (leftRetries > 0) {
+ leftRetries--;
+ }
+ retried = true;
+ try {
+ // sleep for the given time interval
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ie) {
+ LOG.warn("Client retry sleep interrupted! ");
+ }
+ }
+ throw new RuntimeException("Failed to connect to timeline server. "
+ + "Connection retries limit exceeded. "
+ + "The posted timeline event may be missing");
+ };
+
+ private void logException(Exception e, int leftRetries) {
+ if (leftRetries > 0) {
+ LOG.info(
+ "Exception caught by TimelineClientConnectionRetry," + " will try "
+ + leftRetries + " more time(s).\nMessage: " + e.getMessage());
+ } else {
+ // note that maxRetries may be -1 at the very beginning
+ LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ + " will keep retrying.\nMessage: " + e.getMessage());
+ }
+ }
+ }
+
+ private static class TimelineJerseyRetryFilter extends ClientFilter {
+ private TimelineClientConnectionRetry connectionRetry;
+
+ public TimelineJerseyRetryFilter(
+ TimelineClientConnectionRetry connectionRetry) {
+ this.connectionRetry = connectionRetry;
+ }
+
+ @Override
+ public ClientResponse handle(final ClientRequest cr)
+ throws ClientHandlerException {
+ // Set up the retry operation
+ TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
+ @Override
+ public Object run() {
+ // Try pass the request, if fail, keep retrying
+ return getNext().handle(cr);
+ }
+
+ @Override
+ public boolean shouldRetryOn(Exception e) {
+ // Only retry on connection exceptions
+ return (e instanceof ClientHandlerException)
+ && (e.getCause() instanceof ConnectException
+ || e.getCause() instanceof SocketTimeoutException);
+ }
+ };
+ try {
+ return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
+ } catch (IOException e) {
+ throw new ClientHandlerException(
+ "Jersey retry failed!\nMessage: " + e.getMessage());
+ }
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public static class TimelineClientRetryOpForOperateDelegationToken
+ extends TimelineClientRetryOp {
+
+ private final UserGroupInformation authUgi;
+ private final PrivilegedExceptionAction<?> action;
+
+ public TimelineClientRetryOpForOperateDelegationToken(
+ UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
+ this.authUgi = authUgi;
+ this.action = action;
+ }
+
+ @Override
+ public Object run() throws IOException {
+ // Try pass the request, if fail, keep retrying
+ authUgi.checkTGTAndReloginFromKeytab();
+ try {
+ return authUgi.doAs(action);
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean shouldRetryOn(Exception e) {
+ // retry on connection exceptions
+ // and SocketTimeoutException
+ return (e instanceof ConnectException
+ || e instanceof SocketTimeoutException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
new file mode 100644
index 0000000..cef7e5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ * Implementation of timeline v2 client interface.
+ *
+ */
+public class TimelineV2ClientImpl extends TimelineV2Client {
+ private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
+
+ private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
+
+ private TimelineEntityDispatcher entityDispatcher;
+ private volatile String timelineServiceAddress;
+
+ // Retry parameters for identifying new timeline service
+ // TODO consider to merge with connection retry
+ private int maxServiceRetries;
+ private long serviceRetryInterval;
+
+ private TimelineConnector connector;
+
+ private ApplicationId contextAppId;
+
+ public TimelineV2ClientImpl(ApplicationId appId) {
+ super(TimelineV2ClientImpl.class.getName());
+ this.contextAppId = appId;
+ }
+
+ public ApplicationId getContextAppId() {
+ return contextAppId;
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (!YarnConfiguration.timelineServiceEnabled(conf)
+ || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+ throw new IOException("Timeline V2 client is not properly configured. "
+ + "Either timeline service is not enabled or version is not set to"
+ + " 2");
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUgi = ugi.getRealUser();
+ String doAsUser = null;
+ UserGroupInformation authUgi = null;
+ if (realUgi != null) {
+ authUgi = realUgi;
+ doAsUser = ugi.getShortUserName();
+ } else {
+ authUgi = ugi;
+ doAsUser = null;
+ }
+
+ // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
+ DelegationTokenAuthenticatedURL.Token token =
+ new DelegationTokenAuthenticatedURL.Token();
+ connector = new TimelineConnector(false, authUgi, doAsUser, token);
+ addIfService(connector);
+
+ // new version need to auto discovery (with retry till ATS v2 address is
+ // got).
+ maxServiceRetries =
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ serviceRetryInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ entityDispatcher = new TimelineEntityDispatcher(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ entityDispatcher.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ entityDispatcher.stop();
+ super.serviceStop();
+ }
+
+ @Override
+ public void putEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
+ entityDispatcher.dispatchEntities(true, entities);
+ }
+
+ @Override
+ public void putEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException {
+ entityDispatcher.dispatchEntities(false, entities);
+ }
+
+ @Override
+ public void setTimelineServiceAddress(String address) {
+ this.timelineServiceAddress = address;
+ }
+
+ @Private
+ protected void putObjects(String path, MultivaluedMap<String, String> params,
+ Object obj) throws IOException, YarnException {
+
+ int retries = verifyRestEndPointAvailable();
+
+ // timelineServiceAddress could be stale, add retry logic here.
+ boolean needRetry = true;
+ while (needRetry) {
+ try {
+ URI uri = TimelineConnector.constructResURI(getConfig(),
+ timelineServiceAddress, RESOURCE_URI_STR_V2);
+ putObjects(uri, path, params, obj);
+ needRetry = false;
+ } catch (IOException e) {
+ // handle exception for timelineServiceAddress being updated.
+ checkRetryWithSleep(retries, e);
+ retries--;
+ }
+ }
+ }
+
+ /**
+ * Check if reaching to maximum of retries.
+ *
+ * @param retries
+ * @param e
+ */
+ private void checkRetryWithSleep(int retries, IOException e)
+ throws YarnException, IOException {
+ if (retries > 0) {
+ try {
+ Thread.sleep(this.serviceRetryInterval);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while retrying to connect to ATS");
+ }
+ } else {
+ StringBuilder msg =
+ new StringBuilder("TimelineClient has reached to max retry times : ");
+ msg.append(this.maxServiceRetries);
+ msg.append(" for service address: ");
+ msg.append(timelineServiceAddress);
+ LOG.error(msg.toString());
+ throw new IOException(msg.toString(), e);
+ }
+ }
+
+ protected void putObjects(URI base, String path,
+ MultivaluedMap<String, String> params, Object obj)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = connector.getClient().resource(base).path(path).queryParams(params)
+ .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, obj);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg = "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw new IOException(re);
+ }
+ if (resp == null || resp.getStatusInfo()
+ .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
+ String msg =
+ "Response from the timeline server is " + ((resp == null) ? "null"
+ : "not successful," + " HTTP error code: " + resp.getStatus()
+ + ", Server response:\n" + resp.getEntity(String.class));
+ LOG.error(msg);
+ throw new YarnException(msg);
+ }
+ }
+
+ private int verifyRestEndPointAvailable() throws YarnException {
+ // timelineServiceAddress could haven't be initialized yet
+ // or stale (only for new timeline service)
+ int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+ if (timelineServiceAddress == null) {
+ String errMessage = "TimelineClient has reached to max retry times : "
+ + this.maxServiceRetries
+ + ", but failed to fetch timeline service address. Please verify"
+ + " Timeline Auxiliary Service is configured in all the NMs";
+ LOG.error(errMessage);
+ throw new YarnException(errMessage);
+ }
+ return retries;
+ }
+
+ /**
+ * Poll TimelineServiceAddress for maximum of retries times if it is null.
+ *
+ * @param retries
+ * @return the left retry times
+ * @throws IOException
+ */
+ private int pollTimelineServiceAddress(int retries) throws YarnException {
+ while (timelineServiceAddress == null && retries > 0) {
+ try {
+ Thread.sleep(this.serviceRetryInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while trying to connect ATS");
+ }
+ retries--;
+ }
+ return retries;
+ }
+
+ private final class EntitiesHolder extends FutureTask<Void> {
+ private final TimelineEntities entities;
+ private final boolean isSync;
+
+ EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
+ super(new Callable<Void>() {
+ // publishEntities()
+ public Void call() throws Exception {
+ MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+ params.add("appid", getContextAppId().toString());
+ params.add("async", Boolean.toString(!isSync));
+ putObjects("entities", params, entities);
+ return null;
+ }
+ });
+ this.entities = entities;
+ this.isSync = isSync;
+ }
+
+ public boolean isSync() {
+ return isSync;
+ }
+
+ public TimelineEntities getEntities() {
+ return entities;
+ }
+ }
+
+ /**
+ * This class is responsible for collecting the timeline entities and
+ * publishing them in async.
+ */
+ private class TimelineEntityDispatcher {
+ /**
+ * Time period for which the timelineclient will wait for draining after
+ * stop.
+ */
+ private static final long DRAIN_TIME_PERIOD = 2000L;
+
+ private int numberOfAsyncsToMerge;
+ private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+ private ExecutorService executor;
+
+ TimelineEntityDispatcher(Configuration conf) {
+ timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+ numberOfAsyncsToMerge =
+ conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+ YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+ }
+
+ Runnable createRunnable() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ EntitiesHolder entitiesHolder;
+ while (!Thread.currentThread().isInterrupted()) {
+ // Merge all the async calls and make one push, but if its sync
+ // call push immediately
+ try {
+ entitiesHolder = timelineEntityQueue.take();
+ } catch (InterruptedException ie) {
+ LOG.info("Timeline dispatcher thread was interrupted ");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (entitiesHolder != null) {
+ publishWithoutBlockingOnQueue(entitiesHolder);
+ }
+ }
+ } finally {
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.info("Yet to publish " + timelineEntityQueue.size()
+ + " timelineEntities, draining them now. ");
+ }
+ // Try to drain the remaining entities to be published @ the max for
+ // 2 seconds
+ long timeTillweDrain =
+ System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+ while (!timelineEntityQueue.isEmpty()) {
+ publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+ if (System.currentTimeMillis() > timeTillweDrain) {
+ // time elapsed stop publishing further....
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.warn("Time to drain elapsed! Remaining "
+ + timelineEntityQueue.size() + "timelineEntities will not"
+ + " be published");
+ // if some entities were not drained then we need interrupt
+ // the threads which had put sync EntityHolders to the queue.
+ EntitiesHolder nextEntityInTheQueue = null;
+ while ((nextEntityInTheQueue =
+ timelineEntityQueue.poll()) != null) {
+ nextEntityInTheQueue.cancel(true);
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Publishes the given EntitiesHolder and return immediately if sync
+ * call, else tries to fetch the EntitiesHolder from the queue in non
+ * blocking fashion and collate the Entities if possible before
+ * publishing through REST.
+ *
+ * @param entitiesHolder
+ */
+ private void publishWithoutBlockingOnQueue(
+ EntitiesHolder entitiesHolder) {
+ if (entitiesHolder.isSync()) {
+ entitiesHolder.run();
+ return;
+ }
+ int count = 1;
+ while (true) {
+ // loop till we find a sync put Entities or there is nothing
+ // to take
+ EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+ if (nextEntityInTheQueue == null) {
+ // Nothing in the queue just publish and get back to the
+ // blocked wait state
+ entitiesHolder.run();
+ break;
+ } else if (nextEntityInTheQueue.isSync()) {
+ // flush all the prev async entities first
+ entitiesHolder.run();
+ // and then flush the sync entity
+ nextEntityInTheQueue.run();
+ break;
+ } else {
+ // append all async entities together and then flush
+ entitiesHolder.getEntities().addEntities(
+ nextEntityInTheQueue.getEntities().getEntities());
+ count++;
+ if (count == numberOfAsyncsToMerge) {
+ // Flush the entities if the number of the async
+ // putEntites merged reaches the desired limit. To avoid
+ // collecting multiple entities and delaying for a long
+ // time.
+ entitiesHolder.run();
+ break;
+ }
+ }
+ }
+ }
+ };
+ }
+
+ public void dispatchEntities(boolean sync,
+ TimelineEntity[] entitiesTobePublished) throws YarnException {
+ if (executor.isShutdown()) {
+ throw new YarnException("Timeline client is in the process of stopping,"
+ + " not accepting any more TimelineEntities");
+ }
+
+ // wrap all TimelineEntity into TimelineEntities object
+ TimelineEntities entities = new TimelineEntities();
+ for (TimelineEntity entity : entitiesTobePublished) {
+ entities.addEntity(entity);
+ }
+
+ // created a holder and place it in queue
+ EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+ try {
+ timelineEntityQueue.put(entitiesHolder);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException(
+ "Failed while adding entity to the queue for publishing", e);
+ }
+
+ if (sync) {
+ // In sync call we need to wait till its published and if any error then
+ // throw it back
+ try {
+ entitiesHolder.get();
+ } catch (ExecutionException e) {
+ throw new YarnException("Failed while publishing entity",
+ e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while publishing entity", e);
+ }
+ }
+ }
+
+ public void start() {
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(createRunnable());
+ }
+
+ public void stop() {
+ LOG.info("Stopping TimelineClient.");
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index bfc7cbd..f42c078 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -215,11 +215,11 @@ public class TestTimelineClient {
+ "Timeline server should be off to run this test. ");
} catch (RuntimeException ce) {
Assert.assertTrue(
- "Handler exception for reason other than retry: " + ce.getMessage(),
- ce.getMessage().contains("Connection retries limit exceeded"));
+ "Handler exception for reason other than retry: " + ce.getMessage(),
+ ce.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
- Assert.assertTrue("Retry filter didn't perform any retries! ", client
- .connectionRetry.getRetired());
+ Assert.assertTrue("Retry filter didn't perform any retries! ",
+ client.connector.connectionRetry.getRetired());
}
}
@@ -318,7 +318,7 @@ public class TestTimelineClient {
.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ",
- client.connectionRetry.getRetired());
+ client.connector.connectionRetry.getRetired());
}
public static ClientResponse mockEntityClientResponse(
@@ -419,17 +419,26 @@ public class TestTimelineClient {
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
YarnConfiguration conf) {
TimelineClientImpl client = new TimelineClientImpl() {
-
@Override
- public TimelineClientRetryOp
- createTimelineClientRetryOpForOperateDelegationToken(
- final PrivilegedExceptionAction<?> action) throws IOException {
- TimelineClientRetryOpForOperateDelegationToken op =
- spy(new TimelineClientRetryOpForOperateDelegationToken(
- UserGroupInformation.getCurrentUser(), action));
- doThrow(new SocketTimeoutException("Test socketTimeoutException"))
- .when(op).run();
- return op;
+ protected TimelineConnector createTimelineConnector() {
+ TimelineConnector connector =
+ new TimelineConnector(true, authUgi, doAsUser, token) {
+ @Override
+ public TimelineClientRetryOp
+ createRetryOpForOperateDelegationToken(
+ final PrivilegedExceptionAction<?> action)
+ throws IOException {
+ TimelineClientRetryOpForOperateDelegationToken op =
+ spy(new TimelineClientRetryOpForOperateDelegationToken(
+ UserGroupInformation.getCurrentUser(), action));
+ doThrow(
+ new SocketTimeoutException("Test socketTimeoutException"))
+ .when(op).run();
+ return op;
+ }
+ };
+ addIfService(connector);
+ return connector;
}
};
client.init(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index 5813340..c5b02fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
@@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
}
private class TestV2TimelineClientForExceptionHandling
- extends TimelineClientImpl {
+ extends TimelineV2ClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 8994582..ce2c656 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -76,7 +76,7 @@ public class NMTimelinePublisher extends CompositeService {
private String httpAddress;
- private final Map<ApplicationId, TimelineClient> appToClientMap;
+ private final Map<ApplicationId, TimelineV2Client> appToClientMap;
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
@@ -102,7 +102,7 @@ public class NMTimelinePublisher extends CompositeService {
}
@VisibleForTesting
- Map<ApplicationId, TimelineClient> getAppToClientMap() {
+ Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
return appToClientMap;
}
@@ -145,7 +145,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@@ -234,7 +234,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@@ -265,7 +265,7 @@ public class NMTimelinePublisher extends CompositeService {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntities(entity);
} else {
@@ -382,8 +382,8 @@ public class NMTimelinePublisher extends CompositeService {
public void createTimelineClient(ApplicationId appId) {
if (!appToClientMap.containsKey(appId)) {
- TimelineClient timelineClient =
- TimelineClient.createTimelineClient(appId);
+ TimelineV2Client timelineClient =
+ TimelineV2Client.createTimelineClient(appId);
timelineClient.init(getConfig());
timelineClient.start();
appToClientMap.put(appId, timelineClient);
@@ -391,7 +391,7 @@ public class NMTimelinePublisher extends CompositeService {
}
public void stopTimelineClient(ApplicationId appId) {
- TimelineClient client = appToClientMap.remove(appId);
+ TimelineV2Client client = appToClientMap.remove(appId);
if (client != null) {
client.stop();
}
@@ -399,13 +399,13 @@ public class NMTimelinePublisher extends CompositeService {
public void setTimelineServiceAddress(ApplicationId appId,
String collectorAddr) {
- TimelineClient client = appToClientMap.get(appId);
+ TimelineV2Client client = appToClientMap.get(appId);
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
- private TimelineClient getTimelineClient(ApplicationId appId) {
+ private TimelineV2Client getTimelineClient(ApplicationId appId) {
return appToClientMap.get(appId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index ae9397a..e116122 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
- final DummyTimelineClient timelineClient = new DummyTimelineClient();
+ final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
@@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
}
}
- protected static class DummyTimelineClient extends TimelineClientImpl {
+ protected static class DummyTimelineClient extends TimelineV2ClientImpl {
+ public DummyTimelineClient(ApplicationId appId) {
+ super(appId);
+ }
+
private TimelineEntity[] lastPublishedEntities;
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 3ec222f..07058f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
@@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutEntities() throws Exception {
- TimelineClient client =
- TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+ TimelineV2Client client =
+ TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
@@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutExtendedEntities() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
- TimelineClient client =
- TimelineClient.createTimelineClient(appId);
+ TimelineV2Client client =
+ TimelineV2Client.createTimelineClient(appId);
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and
TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed
by Naganarasimha G R.
Posted by sj...@apache.org.
YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4fa1afdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4fa1afdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4fa1afdb
Branch: refs/heads/trunk
Commit: 4fa1afdb883dab8786d2fb5c72a195dd2e87d711
Parents: 5690b51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Feb 16 11:41:04 2017 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Feb 16 11:41:04 2017 -0800
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 57 +-
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 14 +-
.../v2/app/rm/RMContainerAllocator.java | 4 +-
.../jobhistory/TestJobHistoryEventHandler.java | 8 +-
.../distributedshell/ApplicationMaster.java | 98 ++-
.../hadoop/yarn/client/api/AMRMClient.java | 40 +-
.../yarn/client/api/async/AMRMClientAsync.java | 21 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 5 +-
.../yarn/client/api/impl/YarnClientImpl.java | 15 +-
.../hadoop/yarn/client/api/TimelineClient.java | 94 +--
.../yarn/client/api/TimelineV2Client.java | 92 +++
.../client/api/impl/TimelineClientImpl.java | 825 ++-----------------
.../yarn/client/api/impl/TimelineConnector.java | 440 ++++++++++
.../client/api/impl/TimelineV2ClientImpl.java | 459 +++++++++++
.../client/api/impl/TestTimelineClient.java | 39 +-
.../api/impl/TestTimelineClientV2Impl.java | 4 +-
.../timelineservice/NMTimelinePublisher.java | 22 +-
.../TestNMTimelinePublisher.java | 10 +-
.../TestTimelineServiceClientIntegration.java | 10 +-
19 files changed, 1272 insertions(+), 985 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 0cc605c..285d36e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
*/
public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
- private static final JsonNodeFactory FACTORY =
- new ObjectMapper().getNodeFactory();
private final AppContext context;
private final int startCount;
@@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
+ @VisibleForTesting
protected TimelineClient timelineClient;
-
- private boolean timelineServiceV2Enabled = false;
+ @VisibleForTesting
+ protected TimelineV2Client timelineV2Client;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) {
-
- timelineClient =
- ((MRAppMaster.RunningAppContext)context).getTimelineClient();
- timelineClient.init(conf);
- timelineServiceV2Enabled =
- YarnConfiguration.timelineServiceV2Enabled(conf);
+ boolean timelineServiceV2Enabled =
+ ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+ if(timelineServiceV2Enabled) {
+ timelineV2Client =
+ ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
+ timelineV2Client.init(conf);
+ } else {
+ timelineClient =
+ ((MRAppMaster.RunningAppContext) context).getTimelineClient();
+ timelineClient.init(conf);
+ }
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
} else {
@@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService
protected void serviceStart() throws Exception {
if (timelineClient != null) {
timelineClient.start();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.start();
}
eventHandlingThread = new Thread(new Runnable() {
@Override
@@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService
}
if (timelineClient != null) {
timelineClient.stop();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.stop();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
@@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
- if (timelineClient != null) {
- if (timelineServiceV2Enabled) {
- processEventForNewTimelineService(historyEvent, event.getJobID(),
- event.getTimestamp());
- } else {
- processEventForTimelineServer(historyEvent, event.getJobID(),
- event.getTimestamp());
- }
+ if (timelineV2Client != null) {
+ processEventForNewTimelineService(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ } else if (timelineClient != null) {
+ processEventForTimelineServer(historyEvent, event.getJobID(),
+ event.getTimestamp());
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
@@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService
configSize += size;
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (jobEntityForConfigs.getConfigs().size() > 0) {
- timelineClient.putEntities(jobEntityForConfigs);
- timelineClient.putEntities(appEntityForConfigs);
+ timelineV2Client.putEntities(jobEntityForConfigs);
+ timelineV2Client.putEntities(appEntityForConfigs);
jobEntityForConfigs = createJobEntity(jobId);
appEntityForConfigs = new ApplicationEntity();
appEntityForConfigs.setId(appId);
@@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
- timelineClient.putEntities(jobEntityForConfigs);
- timelineClient.putEntities(appEntityForConfigs);
+ timelineV2Client.putEntities(jobEntityForConfigs);
+ timelineV2Client.putEntities(appEntityForConfigs);
}
} catch (IOException | YarnException e) {
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
@@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService
}
try {
if (appEntityWithJobMetrics == null) {
- timelineClient.putEntitiesAsync(tEntity);
+ timelineV2Client.putEntitiesAsync(tEntity);
} else {
- timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
+ timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 835c0aa..12df83d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.crypto.KeyGenerator;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
-import javax.crypto.KeyGenerator;
-
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService {
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null;
+ private TimelineV2Client timelineV2Client = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// create new version TimelineClient
- timelineClient = TimelineClient.createTimelineClient(
+ timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
} else {
timelineClient = TimelineClient.createTimelineClient();
@@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService {
return taskAttemptFinishingMonitor;
}
- // Get Timeline Collector's address (get sync from RM)
public TimelineClient getTimelineClient() {
return timelineClient;
}
+
+ // Get Timeline Collector's address (get sync from RM)
+ public TimelineV2Client getTimelineV2Client() {
+ return timelineV2Client;
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 31bc380..1f88a2c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
- && appContext.getTimelineClient() != null) {
- appContext.getTimelineClient().setTimelineServiceAddress(
+ && appContext.getTimelineV2Client() != null) {
+ appContext.getTimelineV2Client().setTimelineServiceAddress(
response.getCollectorAddr());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 0b33d6b..6c5e604 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -29,8 +29,8 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.logging.Log;
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler {
if (mockContext instanceof RunningAppContext) {
when(((RunningAppContext)mockContext).getTimelineClient()).
thenReturn(TimelineClient.createTimelineClient());
+ when(((RunningAppContext) mockContext).getTimelineV2Client())
+ .thenReturn(TimelineV2Client
+ .createTimelineClient(ApplicationId.newInstance(0, 1)));
}
return mockContext;
}
@@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
protected void serviceStart() {
if (timelineClient != null) {
timelineClient.start();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.start();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 5a06ef6..4daebb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -219,7 +220,9 @@ public class ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
- private boolean timelineServiceV2 = false;
+ private boolean timelineServiceV2Enabled = false;
+
+ private boolean timelineServiceV1Enabled = false;
// App Master configuration
// No. of containers to run shell command on
@@ -293,6 +296,10 @@ public class ApplicationMaster {
// Timeline Client
@VisibleForTesting
TimelineClient timelineClient;
+
+ // Timeline v2 Client
+ private TimelineV2Client timelineV2Client;
+
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
static final String USER_TIMELINE_FILTER_NAME = "user";
@@ -556,9 +563,12 @@ public class ApplicationMaster {
"container_retry_interval", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
- timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
+ timelineServiceV2Enabled =
+ ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+ timelineServiceV1Enabled = !timelineServiceV2Enabled;
} else {
timelineClient = null;
+ timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
@@ -621,18 +631,17 @@ public class ApplicationMaster {
nmClientAsync.start();
startTimelineClient(conf);
- if (timelineServiceV2) {
+ if (timelineServiceV2Enabled) {
// need to bind timelineClient
- amRMClient.registerTimelineClient(timelineClient);
+ amRMClient.registerTimelineV2Client(timelineV2Client);
}
- if(timelineClient != null) {
- if (timelineServiceV2) {
- publishApplicationAttemptEventOnTimelineServiceV2(
- DSEvent.DS_APP_ATTEMPT_START);
- } else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
- }
+
+ if (timelineServiceV2Enabled) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_START);
+ } else if (timelineServiceV1Enabled) {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
// Setup local RPC Server to accept status requests directly from clients
@@ -704,18 +713,21 @@ public class ApplicationMaster {
public Void run() throws Exception {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
// Creating the Timeline Client
- if (timelineServiceV2) {
- timelineClient = TimelineClient.createTimelineClient(
+ if (timelineServiceV2Enabled) {
+ timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
+ timelineV2Client.init(conf);
+ timelineV2Client.start();
LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ timelineClient.start();
LOG.info("Timeline service V1 client is enabled");
}
- timelineClient.init(conf);
- timelineClient.start();
} else {
timelineClient = null;
+ timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
return null;
@@ -741,14 +753,12 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
- if (timelineClient != null) {
- if (timelineServiceV2) {
- publishApplicationAttemptEventOnTimelineServiceV2(
- DSEvent.DS_APP_ATTEMPT_END);
- } else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
- }
+ if (timelineServiceV2Enabled) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_END);
+ } else if (timelineServiceV1Enabled) {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
@@ -797,8 +807,10 @@ public class ApplicationMaster {
amRMClient.stop();
// Stop Timeline Client
- if(timelineClient != null) {
+ if(timelineServiceV1Enabled) {
timelineClient.stop();
+ } else if (timelineServiceV2Enabled) {
+ timelineV2Client.stop();
}
return success;
@@ -853,16 +865,14 @@ public class ApplicationMaster {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
- if(timelineClient != null) {
- if (timelineServiceV2) {
- publishContainerEndEventOnTimelineServiceV2(containerStatus);
- } else {
- publishContainerEndEvent(
- timelineClient, containerStatus, domainId, appSubmitterUgi);
- }
+ if (timelineServiceV2Enabled) {
+ publishContainerEndEventOnTimelineServiceV2(containerStatus);
+ } else if (timelineServiceV1Enabled) {
+ publishContainerEndEvent(timelineClient, containerStatus, domainId,
+ appSubmitterUgi);
}
}
-
+
// ask for more containers if any failed
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
@@ -983,15 +993,13 @@ public class ApplicationMaster {
applicationMaster.nmClientAsync.getContainerStatusAsync(
containerId, container.getNodeId());
}
- if(applicationMaster.timelineClient != null) {
- if (applicationMaster.timelineServiceV2) {
- applicationMaster.publishContainerStartEventOnTimelineServiceV2(
- container);
- } else {
- applicationMaster.publishContainerStartEvent(
- applicationMaster.timelineClient, container,
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
- }
+ if (applicationMaster.timelineServiceV2Enabled) {
+ applicationMaster
+ .publishContainerStartEventOnTimelineServiceV2(container);
+ } else if (applicationMaster.timelineServiceV1Enabled) {
+ applicationMaster.publishContainerStartEvent(
+ applicationMaster.timelineClient, container,
+ applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
}
@@ -1371,7 +1379,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntities(entity);
+ timelineV2Client.putEntities(entity);
return null;
}
});
@@ -1404,7 +1412,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntities(entity);
+ timelineV2Client.putEntities(entity);
return null;
}
});
@@ -1438,7 +1446,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntitiesAsync(entity);
+ timelineV2Client.putEntitiesAsync(entity);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 15d0065..69f3777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
- private TimelineClient timelineClient;
+ private TimelineV2Client timelineV2Client;
+ private boolean timelineServiceV2Enabled;
/**
* Create a new instance of AMRMClient.
@@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
nmTokenCache = NMTokenCache.getSingleton();
}
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
+ }
+
/**
* Object to represent a single container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters
@@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
}
/**
- * Register TimelineClient to AMRMClient.
- * @param client the timeline client to register
+ * Register TimelineV2Client to AMRMClient. Writer's address for the timeline
+ * V2 client will be updated dynamically if registered.
+ *
+ * @param client the timeline v2 client to register
+ * @throws YarnException when this method is invoked even when ATS V2 is not
+ * configured.
*/
- public void registerTimelineClient(TimelineClient client) {
- this.timelineClient = client;
+ public void registerTimelineV2Client(TimelineV2Client client)
+ throws YarnException {
+ if (timelineServiceV2Enabled) {
+ timelineV2Client = client;
+ } else {
+ LOG.error("Trying to register timeline v2 client when not configured.");
+ throw new YarnException(
+ "register timeline v2 client when not configured.");
+ }
}
/**
- * Get registered timeline client.
- * @return the registered timeline client
+ * Get registered timeline v2 client.
+ * @return the registered timeline v2 client
*/
- public TimelineClient getRegisteredTimelineClient() {
- return this.timelineClient;
+ public TimelineV2Client getRegisteredTimelineV2Client() {
+ return this.timelineV2Client;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 4cb27cd..1ecfe1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.client.api.async;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
@@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -346,17 +346,20 @@ extends AbstractService {
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
+ * @throws YarnException when this method is invoked even when ATS V2 is not
+ * configured.
*/
- public void registerTimelineClient(TimelineClient timelineClient) {
- client.registerTimelineClient(timelineClient);
+ public void registerTimelineV2Client(TimelineV2Client timelineClient)
+ throws YarnException {
+ client.registerTimelineV2Client(timelineClient);
}
/**
* Get registered timeline client.
* @return the registered timeline client
*/
- public TimelineClient getRegisteredTimelineClient() {
- return client.getRegisteredTimelineClient();
+ public TimelineV2Client getRegisteredTimelineV2Client() {
+ return client.getRegisteredTimelineV2Client();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 9e2c0e5..6711da2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -326,7 +326,8 @@ extends AMRMClientAsync<T> {
AllocateResponse response = (AllocateResponse) object;
String collectorAddress = response.getCollectorAddr();
- TimelineClient timelineClient = client.getRegisteredTimelineClient();
+ TimelineV2Client timelineClient =
+ client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e406862..4a27fee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
Text timelineService;
@VisibleForTesting
String timelineDTRenewer;
- protected boolean timelineServiceEnabled;
+ private boolean timelineV1ServiceEnabled;
protected boolean timelineServiceBestEffort;
private static final String ROOT = "root";
@@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
+ float timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
- timelineServiceEnabled = true;
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+ && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
+ || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+ timelineV1ServiceEnabled = true;
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf);
}
@@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
// TimelineServer which means we are able to get history information
// for applications/applicationAttempts/containers by using ahsClient
// when the TimelineServer is running.
- if (timelineServiceEnabled || conf.getBoolean(
+ if (timelineV1ServiceEnabled || conf.getBoolean(
YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
historyServiceEnabled = true;
@@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
- if (isSecurityEnabled() && timelineServiceEnabled) {
+ if (isSecurityEnabled() && timelineV1ServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index cc76718..4835239 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
import java.io.Flushable;
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* A client library that can be used to post some information in terms of a
- * number of conceptual entities.
+ * number of conceptual entities. This client library needs to be used along
+ * with Timeline V.1.x server versions.
+ * Refer {@link TimelineV2Client} for ATS V2 interface.
*/
@Public
@Evolving
-public abstract class TimelineClient extends AbstractService implements
+public abstract class TimelineClient extends CompositeService implements
Flushable {
/**
- * Create a timeline client. The current UGI when the user initialize the
- * client will be used to do the put and the delegation token operations. The
- * current user may use {@link UserGroupInformation#doAs} another user to
- * construct and initialize a timeline client if the following operations are
- * supposed to be conducted by that user.
- */
- private ApplicationId contextAppId;
-
- /**
* Creates an instance of the timeline v.1.x client.
+ * The current UGI when the user initialize the client will be used to do the
+ * put and the delegation token operations. The current user may use
+ * {@link UserGroupInformation#doAs} another user to construct and initialize
+ * a timeline client if the following operations are supposed to be conducted
+ * by that user.
*
* @return the created timeline client instance
*/
@@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
return client;
}
- /**
- * Creates an instance of the timeline v.2 client.
- *
- * @param appId the application id with which the timeline client is
- * associated
- * @return the created timeline client instance
- */
- @Public
- public static TimelineClient createTimelineClient(ApplicationId appId) {
- TimelineClient client = new TimelineClientImpl(appId);
- return client;
- }
-
- @Private
- protected TimelineClient(String name, ApplicationId appId) {
+ protected TimelineClient(String name) {
super(name);
- setContextAppId(appId);
}
/**
@@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
-
- /**
- * <p>
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is a blocking API. The method will not return
- * until all the put entities have been persisted. If this method is invoked
- * for a non-v.2 timeline client instance, a YarnException is thrown.
- * </p>
- *
- * @param entities the collection of {@link
- * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract void putEntities(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException;
-
- /**
- * <p>
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is an asynchronous API. The method will return
- * once all the entities are received. If this method is invoked for a
- * non-v.2 timeline client instance, a YarnException is thrown.
- * </p>
- *
- * @param entities the collection of {@link
- * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract void putEntitiesAsync(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException;
-
- /**
- * <p>
- * Update the timeline service address where the request will be sent to.
- * </p>
- * @param address
- * the timeline service address
- */
- public abstract void setTimelineServiceAddress(String address);
-
- protected ApplicationId getContextAppId() {
- return contextAppId;
- }
-
- protected void setContextAppId(ApplicationId appId) {
- this.contextAppId = appId;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
new file mode 100644
index 0000000..32cf1e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities. This client library needs to be used along
+ * with time line v.2 server version.
+ * Refer {@link TimelineClient} for ATS V1 interface.
+ */
+public abstract class TimelineV2Client extends CompositeService {
+ /**
+ * Creates an instance of the timeline v.2 client.
+ *
+ * @param appId the application id with which the timeline client is
+ * associated
+ * @return the created timeline client instance
+ */
+ @Public
+ public static TimelineV2Client createTimelineClient(ApplicationId appId) {
+ TimelineV2Client client = new TimelineV2ClientImpl(appId);
+ return client;
+ }
+
+ protected TimelineV2Client(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * service v.2 collector. It is a blocking API. The method will not return
+ * until all the put entities have been persisted.
+ * </p>
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putEntities(TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * service v.2 collector. It is an asynchronous API. The method will return
+ * once all the entities are received.
+ * </p>
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Update the timeline service address where the request will be sent to.
+ * </p>
+ *
+ * @param address the timeline service address
+ */
+ public abstract void setTimelineServiceAddress(String address);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 4506c48..f49618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.net.URI;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientRequest;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.client.filter.ClientFilter;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
@Private
@Evolving
@@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
- private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
- private static final Joiner JOINER = Joiner.on("");
- public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private static Options opts;
private static final String ENTITY_DATA_TYPE = "entity";
@@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
opts.addOption("help", false, "Print usage");
}
- private Client client;
- private ConnectionConfigurator connConfigurator;
- private DelegationTokenAuthenticator authenticator;
- private DelegationTokenAuthenticatedURL.Token token;
- private UserGroupInformation authUgi;
- private String doAsUser;
- private Configuration configuration;
- private float timelineServiceVersion;
- private TimelineWriter timelineWriter;
- private SSLFactory sslFactory;
-
- private volatile String timelineServiceAddress;
-
- // Retry parameters for identifying new timeline service
- // TODO consider to merge with connection retry
- private int maxServiceRetries;
- private long serviceRetryInterval;
- private boolean timelineServiceV2 = false;
-
- @Private
@VisibleForTesting
- TimelineClientConnectionRetry connectionRetry;
-
- private TimelineEntityDispatcher entityDispatcher;
-
- // Abstract class for an operation that should be retried by timeline client
- @Private
+ protected DelegationTokenAuthenticatedURL.Token token;
@VisibleForTesting
- public static abstract class TimelineClientRetryOp {
- // The operation that should be retried
- public abstract Object run() throws IOException;
- // The method to indicate if we should retry given the incoming exception
- public abstract boolean shouldRetryOn(Exception e);
- }
-
- // Class to handle retry
- // Outside this class, only visible to tests
- @Private
+ protected UserGroupInformation authUgi;
@VisibleForTesting
- static class TimelineClientConnectionRetry {
-
- // maxRetries < 0 means keep trying
- @Private
- @VisibleForTesting
- public int maxRetries;
-
- @Private
- @VisibleForTesting
- public long retryInterval;
-
- // Indicates if retries happened last time. Only tests should read it.
- // In unit tests, retryOn() calls should _not_ be concurrent.
- private boolean retried = false;
+ protected String doAsUser;
- @Private
- @VisibleForTesting
- boolean getRetired() {
- return retried;
- }
-
- // Constructor with default retry settings
- public TimelineClientConnectionRetry(Configuration conf) {
- Preconditions.checkArgument(conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
- "%s property value should be greater than or equal to -1",
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- Preconditions
- .checkArgument(
- conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
- "%s property value should be greater than zero",
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- maxRetries = conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- retryInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- }
-
- public Object retryOn(TimelineClientRetryOp op)
- throws RuntimeException, IOException {
- int leftRetries = maxRetries;
- retried = false;
-
- // keep trying
- while (true) {
- try {
- // try perform the op, if fail, keep retrying
- return op.run();
- } catch (IOException | RuntimeException e) {
- // break if there's no retries left
- if (leftRetries == 0) {
- break;
- }
- if (op.shouldRetryOn(e)) {
- logException(e, leftRetries);
- } else {
- throw e;
- }
- }
- if (leftRetries > 0) {
- leftRetries--;
- }
- retried = true;
- try {
- // sleep for the given time interval
- Thread.sleep(retryInterval);
- } catch (InterruptedException ie) {
- LOG.warn("Client retry sleep interrupted! ");
- }
- }
- throw new RuntimeException("Failed to connect to timeline server. "
- + "Connection retries limit exceeded. "
- + "The posted timeline event may be missing");
- };
-
- private void logException(Exception e, int leftRetries) {
- if (leftRetries > 0) {
- LOG.info("Exception caught by TimelineClientConnectionRetry,"
- + " will try " + leftRetries + " more time(s).\nMessage: "
- + e.getMessage());
- } else {
- // note that maxRetries may be -1 at the very beginning
- LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
- + " will keep retrying.\nMessage: "
- + e.getMessage());
- }
- }
- }
+ private float timelineServiceVersion;
+ private TimelineWriter timelineWriter;
- private class TimelineJerseyRetryFilter extends ClientFilter {
- @Override
- public ClientResponse handle(final ClientRequest cr)
- throws ClientHandlerException {
- // Set up the retry operation
- TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
- @Override
- public Object run() {
- // Try pass the request, if fail, keep retrying
- return getNext().handle(cr);
- }
+ private String timelineServiceAddress;
- @Override
- public boolean shouldRetryOn(Exception e) {
- // Only retry on connection exceptions
- return (e instanceof ClientHandlerException)
- && (e.getCause() instanceof ConnectException ||
- e.getCause() instanceof SocketTimeoutException);
- }
- };
- try {
- return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
- } catch (IOException e) {
- throw new ClientHandlerException("Jersey retry failed!\nMessage: "
- + e.getMessage());
- }
- }
- }
+ @Private
+ @VisibleForTesting
+ TimelineConnector connector;
public TimelineClientImpl() {
- super(TimelineClientImpl.class.getName(), null);
- }
-
- public TimelineClientImpl(ApplicationId applicationId) {
- super(TimelineClientImpl.class.getName(), applicationId);
- this.timelineServiceV2 = true;
+ super(TimelineClientImpl.class.getName());
}
protected void serviceInit(Configuration conf) throws Exception {
- this.configuration = conf;
+ timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+ LOG.info("Timeline service address: " + getTimelineServiceAddress());
+ if (!YarnConfiguration.timelineServiceEnabled(conf)
+ || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
+ || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+ throw new IOException("Timeline V1 client is not properly configured. "
+ + "Either timeline service is not enabled or version is not set to"
+ + " 1.x");
+ }
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
@@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
authUgi = ugi;
doAsUser = null;
}
- ClientConfig cc = new DefaultClientConfig();
- cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
- connConfigurator = initConnConfigurator(conf);
- if (UserGroupInformation.isSecurityEnabled()) {
- authenticator = new KerberosDelegationTokenAuthenticator();
- } else {
- authenticator = new PseudoDelegationTokenAuthenticator();
- }
- authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
+ connector = createTimelineConnector();
- connectionRetry = new TimelineClientConnectionRetry(conf);
- client = new Client(new URLConnectionClientHandler(
- new TimelineURLConnectionFactory()), cc);
- TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
- // TODO need to cleanup filter retry later.
- if (!timelineServiceV2) {
- client.addFilter(retryFilter);
- }
-
- // old version timeline service need to get address from configuration
- // while new version need to auto discovery (with retry).
- if (timelineServiceV2) {
- maxServiceRetries = conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- serviceRetryInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- entityDispatcher = new TimelineEntityDispatcher(conf);
+ if (YarnConfiguration.useHttps(conf)) {
+ timelineServiceAddress =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
- if (YarnConfiguration.useHttps(conf)) {
- setTimelineServiceAddress(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
- } else {
- setTimelineServiceAddress(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
- }
- timelineServiceVersion =
- conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
- LOG.info("Timeline service address: " + getTimelineServiceAddress());
+ timelineServiceAddress =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
super.serviceInit(conf);
}
+ @VisibleForTesting
+ protected TimelineConnector createTimelineConnector() {
+ TimelineConnector newConnector =
+ new TimelineConnector(true, authUgi, doAsUser, token);
+ addIfService(newConnector);
+ return newConnector;
+ }
+
@Override
protected void serviceStart() throws Exception {
- if (timelineServiceV2) {
- entityDispatcher.start();
- } else {
- timelineWriter = createTimelineWriter(configuration, authUgi, client,
- constructResURI(getConfig(), timelineServiceAddress, false));
- }
+ timelineWriter = createTimelineWriter(getConfig(), authUgi,
+ connector.getClient(), TimelineConnector.constructResURI(getConfig(),
+ timelineServiceAddress, RESOURCE_URI_STR_V1));
}
protected TimelineWriter createTimelineWriter(Configuration conf,
@@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
- if (timelineServiceV2) {
- entityDispatcher.stop();
- }
- if (this.sslFactory != null) {
- this.sslFactory.destroy();
- }
super.serviceStop();
}
@@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
}
@Override
- public TimelinePutResponse putEntities(
- TimelineEntity... entities) throws IOException, YarnException {
+ public TimelinePutResponse putEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
return timelineWriter.putEntities(entities);
}
@Override
- public void putEntities(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException {
- if (!timelineServiceV2) {
- throw new YarnException("v.2 method is invoked on a v.1.x client");
- }
- entityDispatcher.dispatchEntities(true, entities);
- }
-
- @Override
- public void putEntitiesAsync(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException {
- if (!timelineServiceV2) {
- throw new YarnException("v.2 method is invoked on a v.1.x client");
- }
- entityDispatcher.dispatchEntities(false, entities);
- }
-
- @Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
timelineWriter.putDomain(domain);
}
- // Used for new timeline service only
- @Private
- protected void putObjects(String path, MultivaluedMap<String, String> params,
- Object obj) throws IOException, YarnException {
-
- int retries = verifyRestEndPointAvailable();
-
- // timelineServiceAddress could be stale, add retry logic here.
- boolean needRetry = true;
- while (needRetry) {
- try {
- URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
- putObjects(uri, path, params, obj);
- needRetry = false;
- } catch (IOException e) {
- // handle exception for timelineServiceAddress being updated.
- checkRetryWithSleep(retries, e);
- retries--;
- }
- }
- }
-
- private int verifyRestEndPointAvailable() throws YarnException {
- // timelineServiceAddress could haven't be initialized yet
- // or stale (only for new timeline service)
- int retries = pollTimelineServiceAddress(this.maxServiceRetries);
- if (timelineServiceAddress == null) {
- String errMessage = "TimelineClient has reached to max retry times : "
- + this.maxServiceRetries
- + ", but failed to fetch timeline service address. Please verify"
- + " Timeline Auxiliary Service is configured in all the NMs";
- LOG.error(errMessage);
- throw new YarnException(errMessage);
- }
- return retries;
- }
-
- /**
- * Check if reaching to maximum of retries.
- * @param retries
- * @param e
- */
- private void checkRetryWithSleep(int retries, IOException e)
- throws YarnException, IOException {
- if (retries > 0) {
- try {
- Thread.sleep(this.serviceRetryInterval);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while retrying to connect to ATS");
- }
- } else {
- StringBuilder msg =
- new StringBuilder("TimelineClient has reached to max retry times : ");
- msg.append(this.maxServiceRetries);
- msg.append(" for service address: ");
- msg.append(timelineServiceAddress);
- LOG.error(msg.toString());
- throw new IOException(msg.toString(), e);
- }
- }
-
- protected void putObjects(
- URI base, String path, MultivaluedMap<String, String> params, Object obj)
- throws IOException, YarnException {
- ClientResponse resp;
- try {
- resp = client.resource(base).path(path).queryParams(params)
- .accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .put(ClientResponse.class, obj);
- } catch (RuntimeException re) {
- // runtime exception is expected if the client cannot connect the server
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg, re);
- throw new IOException(re);
- }
- if (resp == null ||
- resp.getStatusInfo().getStatusCode() !=
- ClientResponse.Status.OK.getStatusCode()) {
- String msg = "Response from the timeline server is " +
- ((resp == null) ? "null":
- "not successful," + " HTTP error code: " + resp.getStatus()
- + ", Server response:\n" + resp.getEntity(String.class));
- LOG.error(msg);
- throw new YarnException(msg);
- }
- }
-
- @Override
- public void setTimelineServiceAddress(String address) {
- this.timelineServiceAddress = address;
- }
-
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
- constructResURI(getConfig(),
- getTimelineServiceAddress(), false).toURL(),
+ TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
token, renewer, doAsUser);
}
};
- return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
+ return (Token<TimelineDelegationTokenIdentifier>) connector
+ .operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
- final URI serviceURI = isTokenServiceAddrEmpty ?
- constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ final URI serviceURI = isTokenServiceAddrEmpty
+ ? TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR_V1, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
};
- return (Long) operateDelegationToken(renewDTAction);
+ return (Long) connector.operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
- throws IOException, YarnException {
+ throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
@@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
- final URI serviceURI = isTokenServiceAddrEmpty ?
- constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ final URI serviceURI = isTokenServiceAddrEmpty
+ ? TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR_V1, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
};
- operateDelegationToken(cancelDTAction);
+ connector.operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server "
- + constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ + TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
+ " and writer " + timelineWriter;
}
- private Object operateDelegationToken(
- final PrivilegedExceptionAction<?> action)
- throws IOException, YarnException {
- // Set up the retry operation
- TimelineClientRetryOp tokenRetryOp =
- createTimelineClientRetryOpForOperateDelegationToken(action);
-
- return connectionRetry.retryOn(tokenRetryOp);
- }
-
- /**
- * Poll TimelineServiceAddress for maximum of retries times if it is null.
- *
- * @param retries
- * @return the left retry times
- * @throws IOException
- */
- private int pollTimelineServiceAddress(int retries) throws YarnException {
- while (timelineServiceAddress == null && retries > 0) {
- try {
- Thread.sleep(this.serviceRetryInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while trying to connect ATS");
- }
- retries--;
- }
- return retries;
- }
-
- private class TimelineURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
- authUgi.checkTGTAndReloginFromKeytab();
- try {
- return new DelegationTokenAuthenticatedURL(
- authenticator, connConfigurator).openConnection(url, token,
- doAsUser);
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (AuthenticationException ae) {
- throw new IOException(ae);
- }
- }
-
- }
-
- private ConnectionConfigurator initConnConfigurator(Configuration conf) {
- try {
- return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
- } catch (Exception e) {
- LOG.debug("Cannot load customized ssl related configuration. " +
- "Fallback to system-generic settings.", e);
- return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
- }
- }
-
- private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
- new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
- return conn;
- }
- };
-
- private ConnectionConfigurator initSslConnConfigurator(final int timeout,
- Configuration conf) throws IOException, GeneralSecurityException {
- final SSLSocketFactory sf;
- final HostnameVerifier hv;
-
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- sslFactory.init();
- sf = sslFactory.createSSLSocketFactory();
- hv = sslFactory.getHostnameVerifier();
-
- return new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection c = (HttpsURLConnection) conn;
- c.setSSLSocketFactory(sf);
- c.setHostnameVerifier(hv);
- }
- setTimeouts(conn, timeout);
- return conn;
- }
- };
- }
-
- private static void setTimeouts(URLConnection connection, int socketTimeout) {
- connection.setConnectTimeout(socketTimeout);
- connection.setReadTimeout(socketTimeout);
- }
-
- private static URI constructResURI(
- Configuration conf, String address, boolean v2) {
- return URI.create(
- JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
- address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
- }
-
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
@@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
-
- @Private
- @VisibleForTesting
- public TimelineClientRetryOp
- createTimelineClientRetryOpForOperateDelegationToken(
- final PrivilegedExceptionAction<?> action) throws IOException {
- return new TimelineClientRetryOpForOperateDelegationToken(
- this.authUgi, action);
- }
-
- @Private
- @VisibleForTesting
- public class TimelineClientRetryOpForOperateDelegationToken
- extends TimelineClientRetryOp {
-
- private final UserGroupInformation authUgi;
- private final PrivilegedExceptionAction<?> action;
-
- public TimelineClientRetryOpForOperateDelegationToken(
- UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
- this.authUgi = authUgi;
- this.action = action;
- }
-
- @Override
- public Object run() throws IOException {
- // Try pass the request, if fail, keep retrying
- authUgi.checkTGTAndReloginFromKeytab();
- try {
- return authUgi.doAs(action);
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean shouldRetryOn(Exception e) {
- // retry on connection exceptions
- // and SocketTimeoutException
- return (e instanceof ConnectException
- || e instanceof SocketTimeoutException);
- }
- }
-
- private final class EntitiesHolder extends FutureTask<Void> {
- private final
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities;
- private final boolean isSync;
-
- EntitiesHolder(
- final
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities,
- final boolean isSync) {
- super(new Callable<Void>() {
- // publishEntities()
- public Void call() throws Exception {
- MultivaluedMap<String, String> params = new MultivaluedMapImpl();
- params.add("appid", getContextAppId().toString());
- params.add("async", Boolean.toString(!isSync));
- putObjects("entities", params, entities);
- return null;
- }
- });
- this.entities = entities;
- this.isSync = isSync;
- }
-
- public boolean isSync() {
- return isSync;
- }
-
- public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- getEntities() {
- return entities;
- }
- }
-
- /**
- * This class is responsible for collecting the timeline entities and
- * publishing them in async.
- */
- private class TimelineEntityDispatcher {
- /**
- * Time period for which the timelineclient will wait for draining after
- * stop.
- */
- private static final long DRAIN_TIME_PERIOD = 2000L;
-
- private int numberOfAsyncsToMerge;
- private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
- private ExecutorService executor;
-
- TimelineEntityDispatcher(Configuration conf) {
- timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
- numberOfAsyncsToMerge =
- conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
- YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
- }
-
- Runnable createRunnable() {
- return new Runnable() {
- @Override
- public void run() {
- try {
- EntitiesHolder entitiesHolder;
- while (!Thread.currentThread().isInterrupted()) {
- // Merge all the async calls and make one push, but if its sync
- // call push immediately
- try {
- entitiesHolder = timelineEntityQueue.take();
- } catch (InterruptedException ie) {
- LOG.info("Timeline dispatcher thread was interrupted ");
- Thread.currentThread().interrupt();
- return;
- }
- if (entitiesHolder != null) {
- publishWithoutBlockingOnQueue(entitiesHolder);
- }
- }
- } finally {
- if (!timelineEntityQueue.isEmpty()) {
- LOG.info("Yet to publish " + timelineEntityQueue.size()
- + " timelineEntities, draining them now. ");
- }
- // Try to drain the remaining entities to be published @ the max for
- // 2 seconds
- long timeTillweDrain =
- System.currentTimeMillis() + DRAIN_TIME_PERIOD;
- while (!timelineEntityQueue.isEmpty()) {
- publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
- if (System.currentTimeMillis() > timeTillweDrain) {
- // time elapsed stop publishing further....
- if (!timelineEntityQueue.isEmpty()) {
- LOG.warn("Time to drain elapsed! Remaining "
- + timelineEntityQueue.size() + "timelineEntities will not"
- + " be published");
- // if some entities were not drained then we need interrupt
- // the threads which had put sync EntityHolders to the queue.
- EntitiesHolder nextEntityInTheQueue = null;
- while ((nextEntityInTheQueue =
- timelineEntityQueue.poll()) != null) {
- nextEntityInTheQueue.cancel(true);
- }
- }
- break;
- }
- }
- }
- }
-
- /**
- * Publishes the given EntitiesHolder and return immediately if sync
- * call, else tries to fetch the EntitiesHolder from the queue in non
- * blocking fashion and collate the Entities if possible before
- * publishing through REST.
- *
- * @param entitiesHolder
- */
- private void publishWithoutBlockingOnQueue(
- EntitiesHolder entitiesHolder) {
- if (entitiesHolder.isSync()) {
- entitiesHolder.run();
- return;
- }
- int count = 1;
- while (true) {
- // loop till we find a sync put Entities or there is nothing
- // to take
- EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
- if (nextEntityInTheQueue == null) {
- // Nothing in the queue just publish and get back to the
- // blocked wait state
- entitiesHolder.run();
- break;
- } else if (nextEntityInTheQueue.isSync()) {
- // flush all the prev async entities first
- entitiesHolder.run();
- // and then flush the sync entity
- nextEntityInTheQueue.run();
- break;
- } else {
- // append all async entities together and then flush
- entitiesHolder.getEntities().addEntities(
- nextEntityInTheQueue.getEntities().getEntities());
- count++;
- if (count == numberOfAsyncsToMerge) {
- // Flush the entities if the number of the async
- // putEntites merged reaches the desired limit. To avoid
- // collecting multiple entities and delaying for a long
- // time.
- entitiesHolder.run();
- break;
- }
- }
- }
- }
- };
- }
-
- public void dispatchEntities(boolean sync,
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
- entitiesTobePublished) throws YarnException {
- if (executor.isShutdown()) {
- throw new YarnException("Timeline client is in the process of stopping,"
- + " not accepting any more TimelineEntities");
- }
-
- // wrap all TimelineEntity into TimelineEntities object
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities =
- new org.apache.hadoop.yarn.api.records.timelineservice.
- TimelineEntities();
- for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
- entity : entitiesTobePublished) {
- entities.addEntity(entity);
- }
-
- // created a holder and place it in queue
- EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
- try {
- timelineEntityQueue.put(entitiesHolder);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException(
- "Failed while adding entity to the queue for publishing", e);
- }
-
- if (sync) {
- // In sync call we need to wait till its published and if any error then
- // throw it back
- try {
- entitiesHolder.get();
- } catch (ExecutionException e) {
- throw new YarnException("Failed while publishing entity",
- e.getCause());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while publishing entity", e);
- }
- }
- }
-
- public void start() {
- executor = Executors.newSingleThreadExecutor();
- executor.execute(createRunnable());
- }
-
- public void stop() {
- LOG.info("Stopping TimelineClient.");
- executor.shutdownNow();
- try {
- executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
- }
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org