You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2017/02/17 02:58:41 UTC
[3/4] hadoop git commit: YARN-4675. Reorganize TimelineClient and
TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed
by Naganarasimha G R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
new file mode 100644
index 0000000..b5b5f77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientRequest;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+/**
+ * Utility Connector class which is used by timeline clients to securely get
+ * connected to the timeline server.
+ *
+ */
+public class TimelineConnector extends AbstractService {
+
+ private static final Joiner JOINER = Joiner.on("");
+ private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
+ public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+
+ private SSLFactory sslFactory;
+ private Client client;
+ private ConnectionConfigurator connConfigurator;
+ private DelegationTokenAuthenticator authenticator;
+ private DelegationTokenAuthenticatedURL.Token token;
+ private UserGroupInformation authUgi;
+ private String doAsUser;
+ @VisibleForTesting
+ TimelineClientConnectionRetry connectionRetry;
+ private boolean requireConnectionRetry;
+
+ public TimelineConnector(boolean requireConnectionRetry,
+ UserGroupInformation authUgi, String doAsUser,
+ DelegationTokenAuthenticatedURL.Token token) {
+ super("TimelineConnector");
+ this.requireConnectionRetry = requireConnectionRetry;
+ this.authUgi = authUgi;
+ this.doAsUser = doAsUser;
+ this.token = token;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+
+ sslFactory = getSSLFactory(conf);
+ connConfigurator = getConnConfigurator(sslFactory);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ authenticator = new KerberosDelegationTokenAuthenticator();
+ } else {
+ authenticator = new PseudoDelegationTokenAuthenticator();
+ }
+ authenticator.setConnectionConfigurator(connConfigurator);
+
+ connectionRetry = new TimelineClientConnectionRetry(conf);
+ client =
+ new Client(
+ new URLConnectionClientHandler(new TimelineURLConnectionFactory(
+ authUgi, authenticator, connConfigurator, token, doAsUser)),
+ cc);
+ if (requireConnectionRetry) {
+ TimelineJerseyRetryFilter retryFilter =
+ new TimelineJerseyRetryFilter(connectionRetry);
+ client.addFilter(retryFilter);
+ }
+ }
+
+ private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
+ = new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+ return conn;
+ }
+ };
+
+ private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
+ try {
+ return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
+ } catch (Exception e) {
+ LOG.debug("Cannot load customized ssl related configuration. "
+ + "Fallback to system-generic settings.", e);
+ return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+ }
+ }
+
+ private static ConnectionConfigurator initSslConnConfigurator(
+ final int timeout, SSLFactory sslFactory)
+ throws IOException, GeneralSecurityException {
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ sf = sslFactory.createSSLSocketFactory();
+ hv = sslFactory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection c = (HttpsURLConnection) conn;
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ }
+ setTimeouts(conn, timeout);
+ return conn;
+ }
+ };
+ }
+
+ protected SSLFactory getSSLFactory(Configuration conf)
+ throws GeneralSecurityException, IOException {
+ SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ newSSLFactory.init();
+ return newSSLFactory;
+ }
+
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
+ connection.setConnectTimeout(socketTimeout);
+ connection.setReadTimeout(socketTimeout);
+ }
+
+ public static URI constructResURI(Configuration conf, String address,
+ String uri) {
+ return URI.create(
+ JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
+ address, uri));
+ }
+
+ DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
+ return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+ }
+
+ protected void serviceStop() {
+ if (this.sslFactory != null) {
+ this.sslFactory.destroy();
+ }
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ public Object operateDelegationToken(
+ final PrivilegedExceptionAction<?> action)
+ throws IOException, YarnException {
+ // Set up the retry operation
+ TimelineClientRetryOp tokenRetryOp =
+ createRetryOpForOperateDelegationToken(action);
+
+ return connectionRetry.retryOn(tokenRetryOp);
+ }
+
+ @Private
+ @VisibleForTesting
+ TimelineClientRetryOp createRetryOpForOperateDelegationToken(
+ final PrivilegedExceptionAction<?> action) throws IOException {
+ return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
+ action);
+ }
+
+ /**
+ * Abstract class for an operation that should be retried by timeline client.
+ */
+ @Private
+ @VisibleForTesting
+ public static abstract class TimelineClientRetryOp {
+ // The operation that should be retried
+ public abstract Object run() throws IOException;
+
+ // The method to indicate if we should retry given the incoming exception
+ public abstract boolean shouldRetryOn(Exception e);
+ }
+
+ private static class TimelineURLConnectionFactory
+ implements HttpURLConnectionFactory {
+ private DelegationTokenAuthenticator authenticator;
+ private UserGroupInformation authUgi;
+ private ConnectionConfigurator connConfigurator;
+ private Token token;
+ private String doAsUser;
+
+ public TimelineURLConnectionFactory(UserGroupInformation authUgi,
+ DelegationTokenAuthenticator authenticator,
+ ConnectionConfigurator connConfigurator,
+ DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
+ this.authUgi = authUgi;
+ this.authenticator = authenticator;
+ this.connConfigurator = connConfigurator;
+ this.token = token;
+ this.doAsUser = doAsUser;
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url)
+ throws IOException {
+ authUgi.checkTGTAndReloginFromKeytab();
+ try {
+ return new DelegationTokenAuthenticatedURL(authenticator,
+ connConfigurator).openConnection(url, token, doAsUser);
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (AuthenticationException ae) {
+ throw new IOException(ae);
+ }
+ }
+
+ }
+
+ // Class to handle retry
+ // Outside this class, only visible to tests
+ @Private
+ @VisibleForTesting
+ static class TimelineClientConnectionRetry {
+
+ // maxRetries < 0 means keep trying
+ @Private
+ @VisibleForTesting
+ public int maxRetries;
+
+ @Private
+ @VisibleForTesting
+ public long retryInterval;
+
+ // Indicates if retries happened last time. Only tests should read it.
+ // In unit tests, retryOn() calls should _not_ be concurrent.
+ private boolean retried = false;
+
+ @Private
+ @VisibleForTesting
+ boolean getRetired() {
+ return retried;
+ }
+
+ // Constructor with default retry settings
+ public TimelineClientConnectionRetry(Configuration conf) {
+ Preconditions.checkArgument(
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
+ >= -1,
+ "%s property value should be greater than or equal to -1",
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ Preconditions.checkArgument(
+ conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.
+ DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
+ "%s property value should be greater than zero",
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ maxRetries =
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ retryInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ }
+
+ public Object retryOn(TimelineClientRetryOp op)
+ throws RuntimeException, IOException {
+ int leftRetries = maxRetries;
+ retried = false;
+
+ // keep trying
+ while (true) {
+ try {
+ // try perform the op, if fail, keep retrying
+ return op.run();
+ } catch (IOException | RuntimeException e) {
+ // break if there's no retries left
+ if (leftRetries == 0) {
+ break;
+ }
+ if (op.shouldRetryOn(e)) {
+ logException(e, leftRetries);
+ } else {
+ throw e;
+ }
+ }
+ if (leftRetries > 0) {
+ leftRetries--;
+ }
+ retried = true;
+ try {
+ // sleep for the given time interval
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ie) {
+ LOG.warn("Client retry sleep interrupted! ");
+ }
+ }
+ throw new RuntimeException("Failed to connect to timeline server. "
+ + "Connection retries limit exceeded. "
+ + "The posted timeline event may be missing");
+ };
+
+ private void logException(Exception e, int leftRetries) {
+ if (leftRetries > 0) {
+ LOG.info(
+ "Exception caught by TimelineClientConnectionRetry," + " will try "
+ + leftRetries + " more time(s).\nMessage: " + e.getMessage());
+ } else {
+ // note that maxRetries may be -1 at the very beginning
+ LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+ + " will keep retrying.\nMessage: " + e.getMessage());
+ }
+ }
+ }
+
+ private static class TimelineJerseyRetryFilter extends ClientFilter {
+ private TimelineClientConnectionRetry connectionRetry;
+
+ public TimelineJerseyRetryFilter(
+ TimelineClientConnectionRetry connectionRetry) {
+ this.connectionRetry = connectionRetry;
+ }
+
+ @Override
+ public ClientResponse handle(final ClientRequest cr)
+ throws ClientHandlerException {
+ // Set up the retry operation
+ TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
+ @Override
+ public Object run() {
+ // Try pass the request, if fail, keep retrying
+ return getNext().handle(cr);
+ }
+
+ @Override
+ public boolean shouldRetryOn(Exception e) {
+ // Only retry on connection exceptions
+ return (e instanceof ClientHandlerException)
+ && (e.getCause() instanceof ConnectException
+ || e.getCause() instanceof SocketTimeoutException);
+ }
+ };
+ try {
+ return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
+ } catch (IOException e) {
+ throw new ClientHandlerException(
+ "Jersey retry failed!\nMessage: " + e.getMessage());
+ }
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public static class TimelineClientRetryOpForOperateDelegationToken
+ extends TimelineClientRetryOp {
+
+ private final UserGroupInformation authUgi;
+ private final PrivilegedExceptionAction<?> action;
+
+ public TimelineClientRetryOpForOperateDelegationToken(
+ UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
+ this.authUgi = authUgi;
+ this.action = action;
+ }
+
+ @Override
+ public Object run() throws IOException {
+ // Try pass the request, if fail, keep retrying
+ authUgi.checkTGTAndReloginFromKeytab();
+ try {
+ return authUgi.doAs(action);
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean shouldRetryOn(Exception e) {
+ // retry on connection exceptions
+ // and SocketTimeoutException
+ return (e instanceof ConnectException
+ || e instanceof SocketTimeoutException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
new file mode 100644
index 0000000..848e238
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ * Implementation of timeline v2 client interface.
+ *
+ */
+public class TimelineV2ClientImpl extends TimelineV2Client {
+ private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
+
+ private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
+
+ private TimelineEntityDispatcher entityDispatcher;
+ private volatile String timelineServiceAddress;
+
+ // Retry parameters for identifying new timeline service
+ // TODO consider to merge with connection retry
+ private int maxServiceRetries;
+ private long serviceRetryInterval;
+
+ private TimelineConnector connector;
+
+ private ApplicationId contextAppId;
+
+ public TimelineV2ClientImpl(ApplicationId appId) {
+ super(TimelineV2ClientImpl.class.getName());
+ this.contextAppId = appId;
+ }
+
+ public ApplicationId getContextAppId() {
+ return contextAppId;
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ if (!YarnConfiguration.timelineServiceEnabled(conf)
+ || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+ throw new IOException("Timeline V2 client is not properly configured. "
+ + "Either timeline service is not enabled or version is not set to"
+ + " 2");
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUgi = ugi.getRealUser();
+ String doAsUser = null;
+ UserGroupInformation authUgi = null;
+ if (realUgi != null) {
+ authUgi = realUgi;
+ doAsUser = ugi.getShortUserName();
+ } else {
+ authUgi = ugi;
+ doAsUser = null;
+ }
+
+ // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
+ DelegationTokenAuthenticatedURL.Token token =
+ new DelegationTokenAuthenticatedURL.Token();
+ connector = new TimelineConnector(false, authUgi, doAsUser, token);
+ addIfService(connector);
+
+ // new version need to auto discovery (with retry till ATS v2 address is
+ // got).
+ maxServiceRetries =
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+ serviceRetryInterval = conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+ entityDispatcher = new TimelineEntityDispatcher(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ entityDispatcher.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ entityDispatcher.stop();
+ super.serviceStop();
+ }
+
+ @Override
+ public void putEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
+ entityDispatcher.dispatchEntities(true, entities);
+ }
+
+ @Override
+ public void putEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException {
+ entityDispatcher.dispatchEntities(false, entities);
+ }
+
+ @Override
+ public void setTimelineServiceAddress(String address) {
+ this.timelineServiceAddress = address;
+ }
+
+ @Private
+ protected void putObjects(String path, MultivaluedMap<String, String> params,
+ Object obj) throws IOException, YarnException {
+
+ int retries = verifyRestEndPointAvailable();
+
+ // timelineServiceAddress could be stale, add retry logic here.
+ boolean needRetry = true;
+ while (needRetry) {
+ try {
+ URI uri = TimelineConnector.constructResURI(getConfig(),
+ timelineServiceAddress, RESOURCE_URI_STR_V2);
+ putObjects(uri, path, params, obj);
+ needRetry = false;
+ } catch (IOException e) {
+ // handle exception for timelineServiceAddress being updated.
+ checkRetryWithSleep(retries, e);
+ retries--;
+ }
+ }
+ }
+
+ /**
+ * Check if reaching to maximum of retries.
+ *
+ * @param retries
+ * @param e
+ */
+ private void checkRetryWithSleep(int retries, IOException e)
+ throws YarnException, IOException {
+ if (retries > 0) {
+ try {
+ Thread.sleep(this.serviceRetryInterval);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while retrying to connect to ATS");
+ }
+ } else {
+ StringBuilder msg =
+ new StringBuilder("TimelineClient has reached to max retry times : ");
+ msg.append(this.maxServiceRetries);
+ msg.append(" for service address: ");
+ msg.append(timelineServiceAddress);
+ LOG.error(msg.toString());
+ throw new IOException(msg.toString(), e);
+ }
+ }
+
+ protected void putObjects(URI base, String path,
+ MultivaluedMap<String, String> params, Object obj)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = connector.getClient().resource(base).path(path).queryParams(params)
+ .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, obj);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg = "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw new IOException(re);
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Response from the timeline server is " + ((resp == null) ? "null"
+ : "not successful," + " HTTP error code: " + resp.getStatus()
+ + ", Server response:\n" + resp.getEntity(String.class));
+ LOG.error(msg);
+ throw new YarnException(msg);
+ }
+ }
+
+ private int verifyRestEndPointAvailable() throws YarnException {
+ // timelineServiceAddress could haven't be initialized yet
+ // or stale (only for new timeline service)
+ int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+ if (timelineServiceAddress == null) {
+ String errMessage = "TimelineClient has reached to max retry times : "
+ + this.maxServiceRetries
+ + ", but failed to fetch timeline service address. Please verify"
+ + " Timeline Auxiliary Service is configured in all the NMs";
+ LOG.error(errMessage);
+ throw new YarnException(errMessage);
+ }
+ return retries;
+ }
+
+ /**
+ * Poll TimelineServiceAddress for maximum of retries times if it is null.
+ *
+ * @param retries
+ * @return the left retry times
+ * @throws IOException
+ */
+ private int pollTimelineServiceAddress(int retries) throws YarnException {
+ while (timelineServiceAddress == null && retries > 0) {
+ try {
+ Thread.sleep(this.serviceRetryInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while trying to connect ATS");
+ }
+ retries--;
+ }
+ return retries;
+ }
+
+ private final class EntitiesHolder extends FutureTask<Void> {
+ private final TimelineEntities entities;
+ private final boolean isSync;
+
+ EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
+ super(new Callable<Void>() {
+ // publishEntities()
+ public Void call() throws Exception {
+ MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+ params.add("appid", getContextAppId().toString());
+ params.add("async", Boolean.toString(!isSync));
+ putObjects("entities", params, entities);
+ return null;
+ }
+ });
+ this.entities = entities;
+ this.isSync = isSync;
+ }
+
+ public boolean isSync() {
+ return isSync;
+ }
+
+ public TimelineEntities getEntities() {
+ return entities;
+ }
+ }
+
+ /**
+ * This class is responsible for collecting the timeline entities and
+ * publishing them in async.
+ */
+ private class TimelineEntityDispatcher {
+ /**
+ * Time period for which the timelineclient will wait for draining after
+ * stop.
+ */
+ private static final long DRAIN_TIME_PERIOD = 2000L;
+
+ private int numberOfAsyncsToMerge;
+ private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+ private ExecutorService executor;
+
+ TimelineEntityDispatcher(Configuration conf) {
+ timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+ numberOfAsyncsToMerge =
+ conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+ YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+ }
+
+ Runnable createRunnable() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ EntitiesHolder entitiesHolder;
+ while (!Thread.currentThread().isInterrupted()) {
+ // Merge all the async calls and make one push, but if its sync
+ // call push immediately
+ try {
+ entitiesHolder = timelineEntityQueue.take();
+ } catch (InterruptedException ie) {
+ LOG.info("Timeline dispatcher thread was interrupted ");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (entitiesHolder != null) {
+ publishWithoutBlockingOnQueue(entitiesHolder);
+ }
+ }
+ } finally {
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.info("Yet to publish " + timelineEntityQueue.size()
+ + " timelineEntities, draining them now. ");
+ }
+ // Try to drain the remaining entities to be published @ the max for
+ // 2 seconds
+ long timeTillweDrain =
+ System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+ while (!timelineEntityQueue.isEmpty()) {
+ publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+ if (System.currentTimeMillis() > timeTillweDrain) {
+ // time elapsed stop publishing further....
+ if (!timelineEntityQueue.isEmpty()) {
+ LOG.warn("Time to drain elapsed! Remaining "
+ + timelineEntityQueue.size() + "timelineEntities will not"
+ + " be published");
+ // if some entities were not drained then we need interrupt
+ // the threads which had put sync EntityHolders to the queue.
+ EntitiesHolder nextEntityInTheQueue = null;
+ while ((nextEntityInTheQueue =
+ timelineEntityQueue.poll()) != null) {
+ nextEntityInTheQueue.cancel(true);
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Publishes the given EntitiesHolder and return immediately if sync
+ * call, else tries to fetch the EntitiesHolder from the queue in non
+ * blocking fashion and collate the Entities if possible before
+ * publishing through REST.
+ *
+ * @param entitiesHolder
+ */
+ private void publishWithoutBlockingOnQueue(
+ EntitiesHolder entitiesHolder) {
+ if (entitiesHolder.isSync()) {
+ entitiesHolder.run();
+ return;
+ }
+ int count = 1;
+ while (true) {
+ // loop till we find a sync put Entities or there is nothing
+ // to take
+ EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+ if (nextEntityInTheQueue == null) {
+ // Nothing in the queue just publish and get back to the
+ // blocked wait state
+ entitiesHolder.run();
+ break;
+ } else if (nextEntityInTheQueue.isSync()) {
+ // flush all the prev async entities first
+ entitiesHolder.run();
+ // and then flush the sync entity
+ nextEntityInTheQueue.run();
+ break;
+ } else {
+ // append all async entities together and then flush
+ entitiesHolder.getEntities().addEntities(
+ nextEntityInTheQueue.getEntities().getEntities());
+ count++;
+ if (count == numberOfAsyncsToMerge) {
+ // Flush the entities if the number of the async
+ // putEntites merged reaches the desired limit. To avoid
+ // collecting multiple entities and delaying for a long
+ // time.
+ entitiesHolder.run();
+ break;
+ }
+ }
+ }
+ }
+ };
+ }
+
+ public void dispatchEntities(boolean sync,
+ TimelineEntity[] entitiesTobePublished) throws YarnException {
+ if (executor.isShutdown()) {
+ throw new YarnException("Timeline client is in the process of stopping,"
+ + " not accepting any more TimelineEntities");
+ }
+
+ // wrap all TimelineEntity into TimelineEntities object
+ TimelineEntities entities = new TimelineEntities();
+ for (TimelineEntity entity : entitiesTobePublished) {
+ entities.addEntity(entity);
+ }
+
+ // created a holder and place it in queue
+ EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+ try {
+ timelineEntityQueue.put(entitiesHolder);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException(
+ "Failed while adding entity to the queue for publishing", e);
+ }
+
+ if (sync) {
+ // In sync call we need to wait till its published and if any error then
+ // throw it back
+ try {
+ entitiesHolder.get();
+ } catch (ExecutionException e) {
+ throw new YarnException("Failed while publishing entity",
+ e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new YarnException("Interrupted while publishing entity", e);
+ }
+ }
+ }
+
+ public void start() {
+ executor = Executors.newSingleThreadExecutor();
+ executor.execute(createRunnable());
+ }
+
+ public void stop() {
+ LOG.info("Stopping TimelineClient.");
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index a1d4449..a44a8ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -215,11 +215,11 @@ public class TestTimelineClient {
+ "Timeline server should be off to run this test. ");
} catch (RuntimeException ce) {
Assert.assertTrue(
- "Handler exception for reason other than retry: " + ce.getMessage(),
- ce.getMessage().contains("Connection retries limit exceeded"));
+ "Handler exception for reason other than retry: " + ce.getMessage(),
+ ce.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
- Assert.assertTrue("Retry filter didn't perform any retries! ", client
- .connectionRetry.getRetired());
+ Assert.assertTrue("Retry filter didn't perform any retries! ",
+ client.connector.connectionRetry.getRetired());
}
}
@@ -318,7 +318,7 @@ public class TestTimelineClient {
.getMessage().contains("Connection retries limit exceeded"));
// we would expect this exception here, check if the client has retried
Assert.assertTrue("Retry filter didn't perform any retries! ",
- client.connectionRetry.getRetired());
+ client.connector.connectionRetry.getRetired());
}
public static ClientResponse mockEntityClientResponse(
@@ -419,17 +419,26 @@ public class TestTimelineClient {
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
YarnConfiguration conf) {
TimelineClientImpl client = new TimelineClientImpl() {
-
@Override
- public TimelineClientRetryOp
- createTimelineClientRetryOpForOperateDelegationToken(
- final PrivilegedExceptionAction<?> action) throws IOException {
- TimelineClientRetryOpForOperateDelegationToken op =
- spy(new TimelineClientRetryOpForOperateDelegationToken(
- UserGroupInformation.getCurrentUser(), action));
- doThrow(new SocketTimeoutException("Test socketTimeoutException"))
- .when(op).run();
- return op;
+ protected TimelineConnector createTimelineConnector() {
+ TimelineConnector connector =
+ new TimelineConnector(true, authUgi, doAsUser, token) {
+ @Override
+ public TimelineClientRetryOp
+ createRetryOpForOperateDelegationToken(
+ final PrivilegedExceptionAction<?> action)
+ throws IOException {
+ TimelineClientRetryOpForOperateDelegationToken op =
+ spy(new TimelineClientRetryOpForOperateDelegationToken(
+ UserGroupInformation.getCurrentUser(), action));
+ doThrow(
+ new SocketTimeoutException("Test socketTimeoutException"))
+ .when(op).run();
+ return op;
+ }
+ };
+ addIfService(connector);
+ return connector;
}
};
client.init(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index 5813340..c5b02fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
if (!currTestName.getMethodName()
.contains("testRetryOnConnectionFailure")) {
@@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
}
private class TestV2TimelineClientForExceptionHandling
- extends TimelineClientImpl {
+ extends TimelineV2ClientImpl {
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
super(id);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 45b9213..851ba53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -77,7 +77,7 @@ public class NMTimelinePublisher extends CompositeService {
private String httpAddress;
- private final Map<ApplicationId, TimelineClient> appToClientMap;
+ private final Map<ApplicationId, TimelineV2Client> appToClientMap;
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
@@ -103,7 +103,7 @@ public class NMTimelinePublisher extends CompositeService {
}
@VisibleForTesting
- Map<ApplicationId, TimelineClient> getAppToClientMap() {
+ Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
return appToClientMap;
}
@@ -148,7 +148,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@@ -242,7 +242,7 @@ public class NMTimelinePublisher extends CompositeService {
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
@@ -273,7 +273,7 @@ public class NMTimelinePublisher extends CompositeService {
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
- TimelineClient timelineClient = getTimelineClient(appId);
+ TimelineV2Client timelineClient = getTimelineClient(appId);
if (timelineClient != null) {
timelineClient.putEntities(entity);
} else {
@@ -390,8 +390,8 @@ public class NMTimelinePublisher extends CompositeService {
public void createTimelineClient(ApplicationId appId) {
if (!appToClientMap.containsKey(appId)) {
- TimelineClient timelineClient =
- TimelineClient.createTimelineClient(appId);
+ TimelineV2Client timelineClient =
+ TimelineV2Client.createTimelineClient(appId);
timelineClient.init(getConfig());
timelineClient.start();
appToClientMap.put(appId, timelineClient);
@@ -399,7 +399,7 @@ public class NMTimelinePublisher extends CompositeService {
}
public void stopTimelineClient(ApplicationId appId) {
- TimelineClient client = appToClientMap.remove(appId);
+ TimelineV2Client client = appToClientMap.remove(appId);
if (client != null) {
client.stop();
}
@@ -407,13 +407,13 @@ public class NMTimelinePublisher extends CompositeService {
public void setTimelineServiceAddress(ApplicationId appId,
String collectorAddr) {
- TimelineClient client = appToClientMap.get(appId);
+ TimelineV2Client client = appToClientMap.get(appId);
if (client != null) {
client.setTimelineServiceAddress(collectorAddr);
}
}
- private TimelineClient getTimelineClient(ApplicationId appId) {
+ private TimelineV2Client getTimelineClient(ApplicationId appId) {
return appToClientMap.get(appId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index ae9397a..e116122 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
- final DummyTimelineClient timelineClient = new DummyTimelineClient();
+ final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
@@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
}
}
- protected static class DummyTimelineClient extends TimelineClientImpl {
+ protected static class DummyTimelineClient extends TimelineV2ClientImpl {
+ public DummyTimelineClient(ApplicationId appId) {
+ super(appId);
+ }
+
private TimelineEntity[] lastPublishedEntities;
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 3ec222f..07058f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
@@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutEntities() throws Exception {
- TimelineClient client =
- TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+ TimelineV2Client client =
+ TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
@@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
@Test
public void testPutExtendedEntities() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
- TimelineClient client =
- TimelineClient.createTimelineClient(appId);
+ TimelineV2Client client =
+ TimelineV2Client.createTimelineClient(appId);
try {
// set the timeline service address manually
client.setTimelineServiceAddress(
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org