You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2017/02/16 19:41:26 UTC

[1/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 5690b51ef -> 4fa1afdb8


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
new file mode 100644
index 0000000..b5b5f77
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientRequest;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+/**
+ * Utility Connector class which is used by timeline clients to securely get
+ * connected to the timeline server.
+ *
+ */
+public class TimelineConnector extends AbstractService {
+
+  private static final Joiner JOINER = Joiner.on("");
+  private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
+  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+
+  private SSLFactory sslFactory;
+  private Client client;
+  private ConnectionConfigurator connConfigurator;
+  private DelegationTokenAuthenticator authenticator;
+  private DelegationTokenAuthenticatedURL.Token token;
+  private UserGroupInformation authUgi;
+  private String doAsUser;
+  @VisibleForTesting
+  TimelineClientConnectionRetry connectionRetry;
+  private boolean requireConnectionRetry;
+
+  public TimelineConnector(boolean requireConnectionRetry,
+      UserGroupInformation authUgi, String doAsUser,
+      DelegationTokenAuthenticatedURL.Token token) {
+    super("TimelineConnector");
+    this.requireConnectionRetry = requireConnectionRetry;
+    this.authUgi = authUgi;
+    this.doAsUser = doAsUser;
+    this.token = token;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    ClientConfig cc = new DefaultClientConfig();
+    cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+
+    sslFactory = getSSLFactory(conf);
+    connConfigurator = getConnConfigurator(sslFactory);
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      authenticator = new KerberosDelegationTokenAuthenticator();
+    } else {
+      authenticator = new PseudoDelegationTokenAuthenticator();
+    }
+    authenticator.setConnectionConfigurator(connConfigurator);
+
+    connectionRetry = new TimelineClientConnectionRetry(conf);
+    client =
+        new Client(
+            new URLConnectionClientHandler(new TimelineURLConnectionFactory(
+                authUgi, authenticator, connConfigurator, token, doAsUser)),
+            cc);
+    if (requireConnectionRetry) {
+      TimelineJerseyRetryFilter retryFilter =
+          new TimelineJerseyRetryFilter(connectionRetry);
+      client.addFilter(retryFilter);
+    }
+  }
+
+  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
+    = new ConnectionConfigurator() {
+        @Override
+        public HttpURLConnection configure(HttpURLConnection conn)
+            throws IOException {
+          setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+          return conn;
+        }
+      };
+
+  private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
+    try {
+      return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
+    } catch (Exception e) {
+      LOG.debug("Cannot load customized ssl related configuration. "
+          + "Fallback to system-generic settings.", e);
+      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+    }
+  }
+
+  private static ConnectionConfigurator initSslConnConfigurator(
+      final int timeout, SSLFactory sslFactory)
+      throws IOException, GeneralSecurityException {
+    final SSLSocketFactory sf;
+    final HostnameVerifier hv;
+
+    sf = sslFactory.createSSLSocketFactory();
+    hv = sslFactory.getHostnameVerifier();
+
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection conn)
+          throws IOException {
+        if (conn instanceof HttpsURLConnection) {
+          HttpsURLConnection c = (HttpsURLConnection) conn;
+          c.setSSLSocketFactory(sf);
+          c.setHostnameVerifier(hv);
+        }
+        setTimeouts(conn, timeout);
+        return conn;
+      }
+    };
+  }
+
+  protected SSLFactory getSSLFactory(Configuration conf)
+      throws GeneralSecurityException, IOException {
+    SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+    newSSLFactory.init();
+    return newSSLFactory;
+  }
+
+  private static void setTimeouts(URLConnection connection, int socketTimeout) {
+    connection.setConnectTimeout(socketTimeout);
+    connection.setReadTimeout(socketTimeout);
+  }
+
+  public static URI constructResURI(Configuration conf, String address,
+      String uri) {
+    return URI.create(
+        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
+            address, uri));
+  }
+
+  DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
+    return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
+  }
+
+  protected void serviceStop() {
+    if (this.sslFactory != null) {
+      this.sslFactory.destroy();
+    }
+  }
+
+  public Client getClient() {
+    return client;
+  }
+
+  public Object operateDelegationToken(
+      final PrivilegedExceptionAction<?> action)
+      throws IOException, YarnException {
+    // Set up the retry operation
+    TimelineClientRetryOp tokenRetryOp =
+        createRetryOpForOperateDelegationToken(action);
+
+    return connectionRetry.retryOn(tokenRetryOp);
+  }
+
+  @Private
+  @VisibleForTesting
+  TimelineClientRetryOp createRetryOpForOperateDelegationToken(
+      final PrivilegedExceptionAction<?> action) throws IOException {
+    return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
+        action);
+  }
+
+  /**
+   * Abstract class for an operation that should be retried by timeline client.
+   */
+  @Private
+  @VisibleForTesting
+  public static abstract class TimelineClientRetryOp {
+    // The operation that should be retried
+    public abstract Object run() throws IOException;
+
+    // The method to indicate if we should retry given the incoming exception
+    public abstract boolean shouldRetryOn(Exception e);
+  }
+
+  private static class TimelineURLConnectionFactory
+      implements HttpURLConnectionFactory {
+    private DelegationTokenAuthenticator authenticator;
+    private UserGroupInformation authUgi;
+    private ConnectionConfigurator connConfigurator;
+    private Token token;
+    private String doAsUser;
+
+    public TimelineURLConnectionFactory(UserGroupInformation authUgi,
+        DelegationTokenAuthenticator authenticator,
+        ConnectionConfigurator connConfigurator,
+        DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
+      this.authUgi = authUgi;
+      this.authenticator = authenticator;
+      this.connConfigurator = connConfigurator;
+      this.token = token;
+      this.doAsUser = doAsUser;
+    }
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url)
+        throws IOException {
+      authUgi.checkTGTAndReloginFromKeytab();
+      try {
+        return new DelegationTokenAuthenticatedURL(authenticator,
+            connConfigurator).openConnection(url, token, doAsUser);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (AuthenticationException ae) {
+        throw new IOException(ae);
+      }
+    }
+
+  }
+
+  // Class to handle retry
+  // Outside this class, only visible to tests
+  @Private
+  @VisibleForTesting
+  static class TimelineClientConnectionRetry {
+
+    // maxRetries < 0 means keep trying
+    @Private
+    @VisibleForTesting
+    public int maxRetries;
+
+    @Private
+    @VisibleForTesting
+    public long retryInterval;
+
+    // Indicates if retries happened last time. Only tests should read it.
+    // In unit tests, retryOn() calls should _not_ be concurrent.
+    private boolean retried = false;
+
+    @Private
+    @VisibleForTesting
+    boolean getRetired() {
+      return retried;
+    }
+
+    // Constructor with default retry settings
+    public TimelineClientConnectionRetry(Configuration conf) {
+      Preconditions.checkArgument(
+          conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
+              >= -1,
+          "%s property value should be greater than or equal to -1",
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      Preconditions.checkArgument(
+          conf.getLong(
+              YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+              YarnConfiguration.
+                DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
+          "%s property value should be greater than zero",
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+      maxRetries =
+          conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      retryInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+    }
+
+    public Object retryOn(TimelineClientRetryOp op)
+        throws RuntimeException, IOException {
+      int leftRetries = maxRetries;
+      retried = false;
+
+      // keep trying
+      while (true) {
+        try {
+          // try perform the op, if fail, keep retrying
+          return op.run();
+        } catch (IOException | RuntimeException e) {
+          // break if there's no retries left
+          if (leftRetries == 0) {
+            break;
+          }
+          if (op.shouldRetryOn(e)) {
+            logException(e, leftRetries);
+          } else {
+            throw e;
+          }
+        }
+        if (leftRetries > 0) {
+          leftRetries--;
+        }
+        retried = true;
+        try {
+          // sleep for the given time interval
+          Thread.sleep(retryInterval);
+        } catch (InterruptedException ie) {
+          LOG.warn("Client retry sleep interrupted! ");
+        }
+      }
+      throw new RuntimeException("Failed to connect to timeline server. "
+          + "Connection retries limit exceeded. "
+          + "The posted timeline event may be missing");
+    };
+
+    private void logException(Exception e, int leftRetries) {
+      if (leftRetries > 0) {
+        LOG.info(
+            "Exception caught by TimelineClientConnectionRetry," + " will try "
+                + leftRetries + " more time(s).\nMessage: " + e.getMessage());
+      } else {
+        // note that maxRetries may be -1 at the very beginning
+        LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
+            + " will keep retrying.\nMessage: " + e.getMessage());
+      }
+    }
+  }
+
+  private static class TimelineJerseyRetryFilter extends ClientFilter {
+    private TimelineClientConnectionRetry connectionRetry;
+
+    public TimelineJerseyRetryFilter(
+        TimelineClientConnectionRetry connectionRetry) {
+      this.connectionRetry = connectionRetry;
+    }
+
+    @Override
+    public ClientResponse handle(final ClientRequest cr)
+        throws ClientHandlerException {
+      // Set up the retry operation
+      TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
+        @Override
+        public Object run() {
+          // Try pass the request, if fail, keep retrying
+          return getNext().handle(cr);
+        }
+
+        @Override
+        public boolean shouldRetryOn(Exception e) {
+          // Only retry on connection exceptions
+          return (e instanceof ClientHandlerException)
+              && (e.getCause() instanceof ConnectException
+                  || e.getCause() instanceof SocketTimeoutException);
+        }
+      };
+      try {
+        return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
+      } catch (IOException e) {
+        throw new ClientHandlerException(
+            "Jersey retry failed!\nMessage: " + e.getMessage());
+      }
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public static class TimelineClientRetryOpForOperateDelegationToken
+      extends TimelineClientRetryOp {
+
+    private final UserGroupInformation authUgi;
+    private final PrivilegedExceptionAction<?> action;
+
+    public TimelineClientRetryOpForOperateDelegationToken(
+        UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
+      this.authUgi = authUgi;
+      this.action = action;
+    }
+
+    @Override
+    public Object run() throws IOException {
+      // Try pass the request, if fail, keep retrying
+      authUgi.checkTGTAndReloginFromKeytab();
+      try {
+        return authUgi.doAs(action);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public boolean shouldRetryOn(Exception e) {
+      // retry on connection exceptions
+      // and SocketTimeoutException
+      return (e instanceof ConnectException
+          || e instanceof SocketTimeoutException);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
new file mode 100644
index 0000000..cef7e5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ * Implementation of timeline v2 client interface.
+ *
+ */
+public class TimelineV2ClientImpl extends TimelineV2Client {
+  private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
+
+  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
+
+  private TimelineEntityDispatcher entityDispatcher;
+  private volatile String timelineServiceAddress;
+
+  // Retry parameters for identifying new timeline service
+  // TODO consider to merge with connection retry
+  private int maxServiceRetries;
+  private long serviceRetryInterval;
+
+  private TimelineConnector connector;
+
+  private ApplicationId contextAppId;
+
+  public TimelineV2ClientImpl(ApplicationId appId) {
+    super(TimelineV2ClientImpl.class.getName());
+    this.contextAppId = appId;
+  }
+
+  public ApplicationId getContextAppId() {
+    return contextAppId;
+  }
+
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!YarnConfiguration.timelineServiceEnabled(conf)
+        || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+      throw new IOException("Timeline V2 client is not properly configured. "
+          + "Either timeline service is not enabled or version is not set to"
+          + " 2");
+    }
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUgi = ugi.getRealUser();
+    String doAsUser = null;
+    UserGroupInformation authUgi = null;
+    if (realUgi != null) {
+      authUgi = realUgi;
+      doAsUser = ugi.getShortUserName();
+    } else {
+      authUgi = ugi;
+      doAsUser = null;
+    }
+
+    // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
+    DelegationTokenAuthenticatedURL.Token token =
+        new DelegationTokenAuthenticatedURL.Token();
+    connector = new TimelineConnector(false, authUgi, doAsUser, token);
+    addIfService(connector);
+
+    // new version need to auto discovery (with retry till ATS v2 address is
+    // got).
+    maxServiceRetries =
+        conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+    serviceRetryInterval = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
+    entityDispatcher = new TimelineEntityDispatcher(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    entityDispatcher.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    entityDispatcher.stop();
+    super.serviceStop();
+  }
+
+  @Override
+  public void putEntities(TimelineEntity... entities)
+      throws IOException, YarnException {
+    entityDispatcher.dispatchEntities(true, entities);
+  }
+
+  @Override
+  public void putEntitiesAsync(TimelineEntity... entities)
+      throws IOException, YarnException {
+    entityDispatcher.dispatchEntities(false, entities);
+  }
+
+  @Override
+  public void setTimelineServiceAddress(String address) {
+    this.timelineServiceAddress = address;
+  }
+
+  @Private
+  protected void putObjects(String path, MultivaluedMap<String, String> params,
+      Object obj) throws IOException, YarnException {
+
+    int retries = verifyRestEndPointAvailable();
+
+    // timelineServiceAddress could be stale, add retry logic here.
+    boolean needRetry = true;
+    while (needRetry) {
+      try {
+        URI uri = TimelineConnector.constructResURI(getConfig(),
+            timelineServiceAddress, RESOURCE_URI_STR_V2);
+        putObjects(uri, path, params, obj);
+        needRetry = false;
+      } catch (IOException e) {
+        // handle exception for timelineServiceAddress being updated.
+        checkRetryWithSleep(retries, e);
+        retries--;
+      }
+    }
+  }
+
+  /**
+   * Check if reaching to maximum of retries.
+   *
+   * @param retries
+   * @param e
+   */
+  private void checkRetryWithSleep(int retries, IOException e)
+      throws YarnException, IOException {
+    if (retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while retrying to connect to ATS");
+      }
+    } else {
+      StringBuilder msg =
+          new StringBuilder("TimelineClient has reached to max retry times : ");
+      msg.append(this.maxServiceRetries);
+      msg.append(" for service address: ");
+      msg.append(timelineServiceAddress);
+      LOG.error(msg.toString());
+      throw new IOException(msg.toString(), e);
+    }
+  }
+
+  protected void putObjects(URI base, String path,
+      MultivaluedMap<String, String> params, Object obj)
+      throws IOException, YarnException {
+    ClientResponse resp;
+    try {
+      resp = connector.getClient().resource(base).path(path).queryParams(params)
+          .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
+          .put(ClientResponse.class, obj);
+    } catch (RuntimeException re) {
+      // runtime exception is expected if the client cannot connect the server
+      String msg = "Failed to get the response from the timeline server.";
+      LOG.error(msg, re);
+      throw new IOException(re);
+    }
+    if (resp == null || resp.getStatusInfo()
+        .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
+      String msg =
+          "Response from the timeline server is " + ((resp == null) ? "null"
+              : "not successful," + " HTTP error code: " + resp.getStatus()
+                  + ", Server response:\n" + resp.getEntity(String.class));
+      LOG.error(msg);
+      throw new YarnException(msg);
+    }
+  }
+
+  private int verifyRestEndPointAvailable() throws YarnException {
+    // timelineServiceAddress could haven't be initialized yet
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    if (timelineServiceAddress == null) {
+      String errMessage = "TimelineClient has reached to max retry times : "
+          + this.maxServiceRetries
+          + ", but failed to fetch timeline service address. Please verify"
+          + " Timeline Auxiliary Service is configured in all the NMs";
+      LOG.error(errMessage);
+      throw new YarnException(errMessage);
+    }
+    return retries;
+  }
+
+  /**
+   * Poll TimelineServiceAddress for maximum of retries times if it is null.
+   *
+   * @param retries
+   * @return the left retry times
+   * @throws IOException
+   */
+  private int pollTimelineServiceAddress(int retries) throws YarnException {
+    while (timelineServiceAddress == null && retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while trying to connect ATS");
+      }
+      retries--;
+    }
+    return retries;
+  }
+
+  private final class EntitiesHolder extends FutureTask<Void> {
+    private final TimelineEntities entities;
+    private final boolean isSync;
+
+    EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
+      super(new Callable<Void>() {
+        // publishEntities()
+        public Void call() throws Exception {
+          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+          params.add("appid", getContextAppId().toString());
+          params.add("async", Boolean.toString(!isSync));
+          putObjects("entities", params, entities);
+          return null;
+        }
+      });
+      this.entities = entities;
+      this.isSync = isSync;
+    }
+
+    public boolean isSync() {
+      return isSync;
+    }
+
+    public TimelineEntities getEntities() {
+      return entities;
+    }
+  }
+
+  /**
+   * This class is responsible for collecting the timeline entities and
+   * publishing them in async.
+   */
+  private class TimelineEntityDispatcher {
+    /**
+     * Time period for which the timelineclient will wait for draining after
+     * stop.
+     */
+    private static final long DRAIN_TIME_PERIOD = 2000L;
+
+    private int numberOfAsyncsToMerge;
+    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+    private ExecutorService executor;
+
+    TimelineEntityDispatcher(Configuration conf) {
+      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+      numberOfAsyncsToMerge =
+          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+    }
+
+    Runnable createRunnable() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            EntitiesHolder entitiesHolder;
+            while (!Thread.currentThread().isInterrupted()) {
+              // Merge all the async calls and make one push, but if its sync
+              // call push immediately
+              try {
+                entitiesHolder = timelineEntityQueue.take();
+              } catch (InterruptedException ie) {
+                LOG.info("Timeline dispatcher thread was interrupted ");
+                Thread.currentThread().interrupt();
+                return;
+              }
+              if (entitiesHolder != null) {
+                publishWithoutBlockingOnQueue(entitiesHolder);
+              }
+            }
+          } finally {
+            if (!timelineEntityQueue.isEmpty()) {
+              LOG.info("Yet to publish " + timelineEntityQueue.size()
+                  + " timelineEntities, draining them now. ");
+            }
+            // Try to drain the remaining entities to be published @ the max for
+            // 2 seconds
+            long timeTillweDrain =
+                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+            while (!timelineEntityQueue.isEmpty()) {
+              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+              if (System.currentTimeMillis() > timeTillweDrain) {
+                // time elapsed stop publishing further....
+                if (!timelineEntityQueue.isEmpty()) {
+                  LOG.warn("Time to drain elapsed! Remaining "
+                      + timelineEntityQueue.size() + "timelineEntities will not"
+                      + " be published");
+                  // if some entities were not drained then we need interrupt
+                  // the threads which had put sync EntityHolders to the queue.
+                  EntitiesHolder nextEntityInTheQueue = null;
+                  while ((nextEntityInTheQueue =
+                      timelineEntityQueue.poll()) != null) {
+                    nextEntityInTheQueue.cancel(true);
+                  }
+                }
+                break;
+              }
+            }
+          }
+        }
+
+        /**
+         * Publishes the given EntitiesHolder and return immediately if sync
+         * call, else tries to fetch the EntitiesHolder from the queue in non
+         * blocking fashion and collate the Entities if possible before
+         * publishing through REST.
+         *
+         * @param entitiesHolder
+         */
+        private void publishWithoutBlockingOnQueue(
+            EntitiesHolder entitiesHolder) {
+          if (entitiesHolder.isSync()) {
+            entitiesHolder.run();
+            return;
+          }
+          int count = 1;
+          while (true) {
+            // loop till we find a sync put Entities or there is nothing
+            // to take
+            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+            if (nextEntityInTheQueue == null) {
+              // Nothing in the queue just publish and get back to the
+              // blocked wait state
+              entitiesHolder.run();
+              break;
+            } else if (nextEntityInTheQueue.isSync()) {
+              // flush all the prev async entities first
+              entitiesHolder.run();
+              // and then flush the sync entity
+              nextEntityInTheQueue.run();
+              break;
+            } else {
+              // append all async entities together and then flush
+              entitiesHolder.getEntities().addEntities(
+                  nextEntityInTheQueue.getEntities().getEntities());
+              count++;
+              if (count == numberOfAsyncsToMerge) {
+                // Flush the entities if the number of the async
+                // putEntites merged reaches the desired limit. To avoid
+                // collecting multiple entities and delaying for a long
+                // time.
+                entitiesHolder.run();
+                break;
+              }
+            }
+          }
+        }
+      };
+    }
+
+    public void dispatchEntities(boolean sync,
+        TimelineEntity[] entitiesTobePublished) throws YarnException {
+      if (executor.isShutdown()) {
+        throw new YarnException("Timeline client is in the process of stopping,"
+            + " not accepting any more TimelineEntities");
+      }
+
+      // wrap all TimelineEntity into TimelineEntities object
+      TimelineEntities entities = new TimelineEntities();
+      for (TimelineEntity entity : entitiesTobePublished) {
+        entities.addEntity(entity);
+      }
+
+      // created a holder and place it in queue
+      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+      try {
+        timelineEntityQueue.put(entitiesHolder);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException(
+            "Failed while adding entity to the queue for publishing", e);
+      }
+
+      if (sync) {
+        // In sync call we need to wait till its published and if any error then
+        // throw it back
+        try {
+          entitiesHolder.get();
+        } catch (ExecutionException e) {
+          throw new YarnException("Failed while publishing entity",
+              e.getCause());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new YarnException("Interrupted while publishing entity", e);
+        }
+      }
+    }
+
+    public void start() {
+      executor = Executors.newSingleThreadExecutor();
+      executor.execute(createRunnable());
+    }
+
+    public void stop() {
+      LOG.info("Stopping TimelineClient.");
+      executor.shutdownNow();
+      try {
+        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        e.printStackTrace();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index bfc7cbd..f42c078 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -215,11 +215,11 @@ public class TestTimelineClient {
           + "Timeline server should be off to run this test. ");
     } catch (RuntimeException ce) {
       Assert.assertTrue(
-        "Handler exception for reason other than retry: " + ce.getMessage(),
-        ce.getMessage().contains("Connection retries limit exceeded"));
+          "Handler exception for reason other than retry: " + ce.getMessage(),
+          ce.getMessage().contains("Connection retries limit exceeded"));
       // we would expect this exception here, check if the client has retried
-      Assert.assertTrue("Retry filter didn't perform any retries! ", client
-        .connectionRetry.getRetired());
+      Assert.assertTrue("Retry filter didn't perform any retries! ",
+          client.connector.connectionRetry.getRetired());
     }
   }
 
@@ -318,7 +318,7 @@ public class TestTimelineClient {
             .getMessage().contains("Connection retries limit exceeded"));
     // we would expect this exception here, check if the client has retried
     Assert.assertTrue("Retry filter didn't perform any retries! ",
-        client.connectionRetry.getRetired());
+        client.connector.connectionRetry.getRetired());
   }
 
   public static ClientResponse mockEntityClientResponse(
@@ -419,17 +419,26 @@ public class TestTimelineClient {
   private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
       YarnConfiguration conf) {
     TimelineClientImpl client = new TimelineClientImpl() {
-
       @Override
-      public TimelineClientRetryOp
-          createTimelineClientRetryOpForOperateDelegationToken(
-              final PrivilegedExceptionAction<?> action) throws IOException {
-        TimelineClientRetryOpForOperateDelegationToken op =
-            spy(new TimelineClientRetryOpForOperateDelegationToken(
-            UserGroupInformation.getCurrentUser(), action));
-        doThrow(new SocketTimeoutException("Test socketTimeoutException"))
-            .when(op).run();
-        return op;
+      protected TimelineConnector createTimelineConnector() {
+        TimelineConnector connector =
+            new TimelineConnector(true, authUgi, doAsUser, token) {
+              @Override
+              public TimelineClientRetryOp
+                createRetryOpForOperateDelegationToken(
+                  final PrivilegedExceptionAction<?> action)
+                  throws IOException {
+                TimelineClientRetryOpForOperateDelegationToken op =
+                    spy(new TimelineClientRetryOpForOperateDelegationToken(
+                        UserGroupInformation.getCurrentUser(), action));
+                doThrow(
+                    new SocketTimeoutException("Test socketTimeoutException"))
+                        .when(op).run();
+                return op;
+              }
+            };
+        addIfService(connector);
+        return connector;
       }
     };
     client.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
index 5813340..c5b02fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
   public void setup() {
     conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
     if (!currTestName.getMethodName()
         .contains("testRetryOnConnectionFailure")) {
@@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
   }
 
   private class TestV2TimelineClientForExceptionHandling
-      extends TimelineClientImpl {
+      extends TimelineV2ClientImpl {
     public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
       super(id);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 8994582..ce2c656 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -76,7 +76,7 @@ public class NMTimelinePublisher extends CompositeService {
 
   private String httpAddress;
 
-  private final Map<ApplicationId, TimelineClient> appToClientMap;
+  private final Map<ApplicationId, TimelineV2Client> appToClientMap;
 
   public NMTimelinePublisher(Context context) {
     super(NMTimelinePublisher.class.getName());
@@ -102,7 +102,7 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   @VisibleForTesting
-  Map<ApplicationId, TimelineClient> getAppToClientMap() {
+  Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
     return appToClientMap;
   }
 
@@ -145,7 +145,7 @@ public class NMTimelinePublisher extends CompositeService {
       try {
         // no need to put it as part of publisher as timeline client already has
         // Queuing concept
-        TimelineClient timelineClient = getTimelineClient(appId);
+        TimelineV2Client timelineClient = getTimelineClient(appId);
         if (timelineClient != null) {
           timelineClient.putEntitiesAsync(entity);
         } else {
@@ -234,7 +234,7 @@ public class NMTimelinePublisher extends CompositeService {
     try {
       // no need to put it as part of publisher as timeline client already has
       // Queuing concept
-      TimelineClient timelineClient = getTimelineClient(appId);
+      TimelineV2Client timelineClient = getTimelineClient(appId);
       if (timelineClient != null) {
         timelineClient.putEntitiesAsync(entity);
       } else {
@@ -265,7 +265,7 @@ public class NMTimelinePublisher extends CompositeService {
         LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
             + TimelineUtils.dumpTimelineRecordtoJSON(entity));
       }
-      TimelineClient timelineClient = getTimelineClient(appId);
+      TimelineV2Client timelineClient = getTimelineClient(appId);
       if (timelineClient != null) {
         timelineClient.putEntities(entity);
       } else {
@@ -382,8 +382,8 @@ public class NMTimelinePublisher extends CompositeService {
 
   public void createTimelineClient(ApplicationId appId) {
     if (!appToClientMap.containsKey(appId)) {
-      TimelineClient timelineClient =
-          TimelineClient.createTimelineClient(appId);
+      TimelineV2Client timelineClient =
+          TimelineV2Client.createTimelineClient(appId);
       timelineClient.init(getConfig());
       timelineClient.start();
       appToClientMap.put(appId, timelineClient);
@@ -391,7 +391,7 @@ public class NMTimelinePublisher extends CompositeService {
   }
 
   public void stopTimelineClient(ApplicationId appId) {
-    TimelineClient client = appToClientMap.remove(appId);
+    TimelineV2Client client = appToClientMap.remove(appId);
     if (client != null) {
       client.stop();
     }
@@ -399,13 +399,13 @@ public class NMTimelinePublisher extends CompositeService {
 
   public void setTimelineServiceAddress(ApplicationId appId,
       String collectorAddr) {
-    TimelineClient client = appToClientMap.get(appId);
+    TimelineV2Client client = appToClientMap.get(appId);
     if (client != null) {
       client.setTimelineServiceAddress(collectorAddr);
     }
   }
 
-  private TimelineClient getTimelineClient(ApplicationId appId) {
+  private TimelineV2Client getTimelineClient(ApplicationId appId) {
     return appToClientMap.get(appId);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index ae9397a..e116122 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
   public void testContainerResourceUsage() {
     Context context = mock(Context.class);
     @SuppressWarnings("unchecked")
-    final DummyTimelineClient timelineClient = new DummyTimelineClient();
+    final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
     when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
     when(context.getHttpPort()).thenReturn(0);
     NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
@@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
     }
   }
 
-  protected static class DummyTimelineClient extends TimelineClientImpl {
+  protected static class DummyTimelineClient extends TimelineV2ClientImpl {
+    public DummyTimelineClient(ApplicationId appId) {
+      super(appId);
+    }
+
     private TimelineEntity[] lastPublishedEntities;
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 3ec222f..07058f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
@@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
 
   @Test
   public void testPutEntities() throws Exception {
-    TimelineClient client =
-        TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+    TimelineV2Client client =
+        TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
     try {
       // set the timeline service address manually
       client.setTimelineServiceAddress(
@@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
   @Test
   public void testPutExtendedEntities() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    TimelineClient client =
-        TimelineClient.createTimelineClient(appId);
+    TimelineV2Client client =
+        TimelineV2Client.createTimelineClient(appId);
     try {
       // set the timeline service address manually
       client.setTimelineServiceAddress(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.

Posted by sj...@apache.org.
YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4fa1afdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4fa1afdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4fa1afdb

Branch: refs/heads/trunk
Commit: 4fa1afdb883dab8786d2fb5c72a195dd2e87d711
Parents: 5690b51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Feb 16 11:41:04 2017 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Feb 16 11:41:04 2017 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  57 +-
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  14 +-
 .../v2/app/rm/RMContainerAllocator.java         |   4 +-
 .../jobhistory/TestJobHistoryEventHandler.java  |   8 +-
 .../distributedshell/ApplicationMaster.java     |  98 ++-
 .../hadoop/yarn/client/api/AMRMClient.java      |  40 +-
 .../yarn/client/api/async/AMRMClientAsync.java  |  21 +-
 .../api/async/impl/AMRMClientAsyncImpl.java     |   5 +-
 .../yarn/client/api/impl/YarnClientImpl.java    |  15 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |  94 +--
 .../yarn/client/api/TimelineV2Client.java       |  92 +++
 .../client/api/impl/TimelineClientImpl.java     | 825 ++-----------------
 .../yarn/client/api/impl/TimelineConnector.java | 440 ++++++++++
 .../client/api/impl/TimelineV2ClientImpl.java   | 459 +++++++++++
 .../client/api/impl/TestTimelineClient.java     |  39 +-
 .../api/impl/TestTimelineClientV2Impl.java      |   4 +-
 .../timelineservice/NMTimelinePublisher.java    |  22 +-
 .../TestNMTimelinePublisher.java                |  10 +-
 .../TestTimelineServiceClientIntegration.java   |  10 +-
 19 files changed, 1272 insertions(+), 985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 0cc605c..285d36e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientHandlerException;
 
@@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
  */
 public class JobHistoryEventHandler extends AbstractService
     implements EventHandler<JobHistoryEvent> {
-  private static final JsonNodeFactory FACTORY =
-      new ObjectMapper().getNodeFactory();
 
   private final AppContext context;
   private final int startCount;
@@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
+  @VisibleForTesting
   protected TimelineClient timelineClient;
-
-  private boolean timelineServiceV2Enabled = false;
+  @VisibleForTesting
+  protected TimelineV2Client timelineV2Client;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
       LOG.info("Emitting job history data to the timeline service is enabled");
       if (YarnConfiguration.timelineServiceEnabled(conf)) {
-
-        timelineClient =
-            ((MRAppMaster.RunningAppContext)context).getTimelineClient();
-        timelineClient.init(conf);
-        timelineServiceV2Enabled =
-            YarnConfiguration.timelineServiceV2Enabled(conf);
+        boolean timelineServiceV2Enabled =
+            ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+        if(timelineServiceV2Enabled) {
+          timelineV2Client =
+              ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
+          timelineV2Client.init(conf);
+        } else {
+          timelineClient =
+              ((MRAppMaster.RunningAppContext) context).getTimelineClient();
+          timelineClient.init(conf);
+        }
         LOG.info("Timeline service is enabled; version: " +
             YarnConfiguration.getTimelineServiceVersion(conf));
       } else {
@@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService
   protected void serviceStart() throws Exception {
     if (timelineClient != null) {
       timelineClient.start();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.start();
     }
     eventHandlingThread = new Thread(new Runnable() {
       @Override
@@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService
     }
     if (timelineClient != null) {
       timelineClient.stop();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.stop();
     }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
@@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        if (timelineClient != null) {
-          if (timelineServiceV2Enabled) {
-            processEventForNewTimelineService(historyEvent, event.getJobID(),
-                event.getTimestamp());
-          } else {
-            processEventForTimelineServer(historyEvent, event.getJobID(),
-                event.getTimestamp());
-          }
+        if (timelineV2Client != null) {
+          processEventForNewTimelineService(historyEvent, event.getJobID(),
+              event.getTimestamp());
+        } else if (timelineClient != null) {
+          processEventForTimelineServer(historyEvent, event.getJobID(),
+              event.getTimestamp());
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
@@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService
         configSize += size;
         if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
           if (jobEntityForConfigs.getConfigs().size() > 0) {
-            timelineClient.putEntities(jobEntityForConfigs);
-            timelineClient.putEntities(appEntityForConfigs);
+            timelineV2Client.putEntities(jobEntityForConfigs);
+            timelineV2Client.putEntities(appEntityForConfigs);
             jobEntityForConfigs = createJobEntity(jobId);
             appEntityForConfigs = new ApplicationEntity();
             appEntityForConfigs.setId(appId);
@@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService
         appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
       }
       if (configSize > 0) {
-        timelineClient.putEntities(jobEntityForConfigs);
-        timelineClient.putEntities(appEntityForConfigs);
+        timelineV2Client.putEntities(jobEntityForConfigs);
+        timelineV2Client.putEntities(appEntityForConfigs);
       }
     } catch (IOException | YarnException e) {
       LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
@@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService
     }
     try {
       if (appEntityWithJobMetrics == null) {
-        timelineClient.putEntitiesAsync(tEntity);
+        timelineV2Client.putEntitiesAsync(tEntity);
       } else {
-        timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
+        timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
       }
     } catch (IOException | YarnException e) {
       LOG.error("Failed to process Event " + event.getEventType()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 835c0aa..12df83d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.crypto.KeyGenerator;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import javax.crypto.KeyGenerator;
-
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService {
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
     private TimelineClient timelineClient = null;
+    private TimelineV2Client timelineV2Client = null;
 
     private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
 
@@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService {
 
         if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
           // create new version TimelineClient
-          timelineClient = TimelineClient.createTimelineClient(
+          timelineV2Client = TimelineV2Client.createTimelineClient(
               appAttemptID.getApplicationId());
         } else {
           timelineClient = TimelineClient.createTimelineClient();
@@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService {
       return taskAttemptFinishingMonitor;
     }
 
-    // Get Timeline Collector's address (get sync from RM)
     public TimelineClient getTimelineClient() {
       return timelineClient;
     }
+
+    // Get Timeline Collector's address (get sync from RM)
+    public TimelineV2Client getTimelineV2Client() {
+      return timelineV2Client;
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 31bc380..1f88a2c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     MRAppMaster.RunningAppContext appContext =
         (MRAppMaster.RunningAppContext)this.getContext();
     if (collectorAddr != null && !collectorAddr.isEmpty()
-        && appContext.getTimelineClient() != null) {
-      appContext.getTimelineClient().setTimelineServiceAddress(
+        && appContext.getTimelineV2Client() != null) {
+      appContext.getTimelineV2Client().setTimelineServiceAddress(
           response.getCollectorAddr());
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 0b33d6b..6c5e604 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -29,8 +29,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler {
     if (mockContext instanceof RunningAppContext) {
       when(((RunningAppContext)mockContext).getTimelineClient()).
           thenReturn(TimelineClient.createTimelineClient());
+      when(((RunningAppContext) mockContext).getTimelineV2Client())
+          .thenReturn(TimelineV2Client
+              .createTimelineClient(ApplicationId.newInstance(0, 1)));
     }
     return mockContext;
   }
@@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   protected void serviceStart() {
     if (timelineClient != null) {
       timelineClient.start();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.start();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 5a06ef6..4daebb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -219,7 +220,9 @@ public class ApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
-  private boolean timelineServiceV2 = false;
+  private boolean timelineServiceV2Enabled = false;
+
+  private boolean timelineServiceV1Enabled = false;
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -293,6 +296,10 @@ public class ApplicationMaster {
   // Timeline Client
   @VisibleForTesting
   TimelineClient timelineClient;
+
+  // Timeline v2 Client
+  private TimelineV2Client timelineV2Client;
+
   static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
   static final String APPID_TIMELINE_FILTER_NAME = "appId";
   static final String USER_TIMELINE_FILTER_NAME = "user";
@@ -556,9 +563,12 @@ public class ApplicationMaster {
         "container_retry_interval", "0"));
 
     if (YarnConfiguration.timelineServiceEnabled(conf)) {
-      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
+      timelineServiceV2Enabled =
+          ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+      timelineServiceV1Enabled = !timelineServiceV2Enabled;
     } else {
       timelineClient = null;
+      timelineV2Client = null;
       LOG.warn("Timeline service is not enabled");
     }
 
@@ -621,18 +631,17 @@ public class ApplicationMaster {
     nmClientAsync.start();
 
     startTimelineClient(conf);
-    if (timelineServiceV2) {
+    if (timelineServiceV2Enabled) {
       // need to bind timelineClient
-      amRMClient.registerTimelineClient(timelineClient);
+      amRMClient.registerTimelineV2Client(timelineV2Client);
     }
-    if(timelineClient != null) {
-      if (timelineServiceV2) {
-        publishApplicationAttemptEventOnTimelineServiceV2(
-            DSEvent.DS_APP_ATTEMPT_START);
-      } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
-      }
+
+    if (timelineServiceV2Enabled) {
+      publishApplicationAttemptEventOnTimelineServiceV2(
+          DSEvent.DS_APP_ATTEMPT_START);
+    } else if (timelineServiceV1Enabled) {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
     }
 
     // Setup local RPC Server to accept status requests directly from clients
@@ -704,18 +713,21 @@ public class ApplicationMaster {
         public Void run() throws Exception {
           if (YarnConfiguration.timelineServiceEnabled(conf)) {
             // Creating the Timeline Client
-            if (timelineServiceV2) {
-              timelineClient = TimelineClient.createTimelineClient(
+            if (timelineServiceV2Enabled) {
+              timelineV2Client = TimelineV2Client.createTimelineClient(
                   appAttemptID.getApplicationId());
+              timelineV2Client.init(conf);
+              timelineV2Client.start();
               LOG.info("Timeline service V2 client is enabled");
             } else {
               timelineClient = TimelineClient.createTimelineClient();
+              timelineClient.init(conf);
+              timelineClient.start();
               LOG.info("Timeline service V1 client is enabled");
             }
-            timelineClient.init(conf);
-            timelineClient.start();
           } else {
             timelineClient = null;
+            timelineV2Client = null;
             LOG.warn("Timeline service is not enabled");
           }
           return null;
@@ -741,14 +753,12 @@ public class ApplicationMaster {
       } catch (InterruptedException ex) {}
     }
 
-    if (timelineClient != null) {
-      if (timelineServiceV2) {
-        publishApplicationAttemptEventOnTimelineServiceV2(
-            DSEvent.DS_APP_ATTEMPT_END);
-      } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
-      }
+    if (timelineServiceV2Enabled) {
+      publishApplicationAttemptEventOnTimelineServiceV2(
+          DSEvent.DS_APP_ATTEMPT_END);
+    } else if (timelineServiceV1Enabled) {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
     }
 
     // Join all launched threads
@@ -797,8 +807,10 @@ public class ApplicationMaster {
     amRMClient.stop();
 
     // Stop Timeline Client
-    if(timelineClient != null) {
+    if(timelineServiceV1Enabled) {
       timelineClient.stop();
+    } else if (timelineServiceV2Enabled) {
+      timelineV2Client.stop();
     }
 
     return success;
@@ -853,16 +865,14 @@ public class ApplicationMaster {
           LOG.info("Container completed successfully." + ", containerId="
               + containerStatus.getContainerId());
         }
-        if(timelineClient != null) {
-          if (timelineServiceV2) {
-            publishContainerEndEventOnTimelineServiceV2(containerStatus);
-          } else {
-            publishContainerEndEvent(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
-          }
+        if (timelineServiceV2Enabled) {
+          publishContainerEndEventOnTimelineServiceV2(containerStatus);
+        } else if (timelineServiceV1Enabled) {
+          publishContainerEndEvent(timelineClient, containerStatus, domainId,
+              appSubmitterUgi);
         }
       }
-      
+
       // ask for more containers if any failed
       int askCount = numTotalContainers - numRequestedContainers.get();
       numRequestedContainers.addAndGet(askCount);
@@ -983,15 +993,13 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(
             containerId, container.getNodeId());
       }
-      if(applicationMaster.timelineClient != null) {
-        if (applicationMaster.timelineServiceV2) {
-          applicationMaster.publishContainerStartEventOnTimelineServiceV2(
-              container);
-        } else {
-          applicationMaster.publishContainerStartEvent(
-              applicationMaster.timelineClient, container,
-              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
-        }
+      if (applicationMaster.timelineServiceV2Enabled) {
+        applicationMaster
+            .publishContainerStartEventOnTimelineServiceV2(container);
+      } else if (applicationMaster.timelineServiceV1Enabled) {
+        applicationMaster.publishContainerStartEvent(
+            applicationMaster.timelineClient, container,
+            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
       }
     }
 
@@ -1371,7 +1379,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntities(entity);
+          timelineV2Client.putEntities(entity);
           return null;
         }
       });
@@ -1404,7 +1412,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntities(entity);
+          timelineV2Client.putEntities(entity);
           return null;
         }
       });
@@ -1438,7 +1446,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntitiesAsync(entity);
+          timelineV2Client.putEntitiesAsync(entity);
           return null;
         }
       });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 15d0065..69f3777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
 
-  private TimelineClient timelineClient;
+  private TimelineV2Client timelineV2Client;
+  private boolean timelineServiceV2Enabled;
 
   /**
    * Create a new instance of AMRMClient.
@@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     nmTokenCache = NMTokenCache.getSingleton();
   }
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
+  }
+
   /**
    * Object to represent a single container request for resources. Scheduler
    * documentation should be consulted for the specifics of how the parameters
@@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
   }
 
   /**
-   * Register TimelineClient to AMRMClient.
-   * @param client the timeline client to register
+   * Register TimelineV2Client to AMRMClient. Writer's address for the timeline
+   * V2 client will be updated dynamically if registered.
+   *
+   * @param client the timeline v2 client to register
+   * @throws YarnException when this method is invoked even when ATS V2 is not
+   *           configured.
    */
-  public void registerTimelineClient(TimelineClient client) {
-    this.timelineClient = client;
+  public void registerTimelineV2Client(TimelineV2Client client)
+      throws YarnException {
+    if (timelineServiceV2Enabled) {
+      timelineV2Client = client;
+    } else {
+      LOG.error("Trying to register timeline v2 client when not configured.");
+      throw new YarnException(
+          "register timeline v2 client when not configured.");
+    }
   }
 
   /**
-   * Get registered timeline client.
-   * @return the registered timeline client
+   * Get registered timeline v2 client.
+   * @return the registered timeline v2 client
    */
-  public TimelineClient getRegisteredTimelineClient() {
-    return this.timelineClient;
+  public TimelineV2Client getRegisteredTimelineV2Client() {
+    return this.timelineV2Client;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 4cb27cd..1ecfe1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.client.api.async;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 
 /**
  * <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -346,17 +346,20 @@ extends AbstractService {
   /**
    * Register TimelineClient to AMRMClient.
    * @param timelineClient
+   * @throws YarnException when this method is invoked even when ATS V2 is not
+   *           configured.
    */
-  public void registerTimelineClient(TimelineClient timelineClient) {
-    client.registerTimelineClient(timelineClient);
+  public void registerTimelineV2Client(TimelineV2Client timelineClient)
+      throws YarnException {
+    client.registerTimelineV2Client(timelineClient);
   }
 
   /**
    * Get registered timeline client.
    * @return the registered timeline client
    */
-  public TimelineClient getRegisteredTimelineClient() {
-    return client.getRegisteredTimelineClient();
+  public TimelineV2Client getRegisteredTimelineV2Client() {
+    return client.getRegisteredTimelineV2Client();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 9e2c0e5..6711da2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -326,7 +326,8 @@ extends AMRMClientAsync<T> {
 
           AllocateResponse response = (AllocateResponse) object;
           String collectorAddress = response.getCollectorAddr();
-          TimelineClient timelineClient = client.getRegisteredTimelineClient();
+          TimelineV2Client timelineClient =
+              client.getRegisteredTimelineV2Client();
           if (timelineClient != null && collectorAddress != null
               && !collectorAddress.isEmpty()) {
             if (collectorAddr == null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e406862..4a27fee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
   Text timelineService;
   @VisibleForTesting
   String timelineDTRenewer;
-  protected boolean timelineServiceEnabled;
+  private boolean timelineV1ServiceEnabled;
   protected boolean timelineServiceBestEffort;
 
   private static final String ROOT = "root";
@@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
         YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
     }
 
+    float timelineServiceVersion =
+        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-      timelineServiceEnabled = true;
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+        && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
+            || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+      timelineV1ServiceEnabled = true;
       timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
       timelineService = TimelineUtils.buildTimelineTokenService(conf);
     }
@@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
     // TimelineServer which means we are able to get history information
     // for applications/applicationAttempts/containers by using ahsClient
     // when the TimelineServer is running.
-    if (timelineServiceEnabled || conf.getBoolean(
+    if (timelineV1ServiceEnabled || conf.getBoolean(
         YarnConfiguration.APPLICATION_HISTORY_ENABLED,
         YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
       historyServiceEnabled = true;
@@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
 
     // Automatically add the timeline DT into the CLC
     // Only when the security and the timeline service are both enabled
-    if (isSecurityEnabled() && timelineServiceEnabled) {
+    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
       addTimelineDelegationToken(appContext.getAMContainerSpec());
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index cc76718..4835239 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
 import java.io.Flushable;
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 
 /**
  * A client library that can be used to post some information in terms of a
- * number of conceptual entities.
+ * number of conceptual entities. This client library needs to be used along
+ * with Timeline V.1.x server versions.
+ * Refer {@link TimelineV2Client} for ATS V2 interface.
  */
 @Public
 @Evolving
-public abstract class TimelineClient extends AbstractService implements
+public abstract class TimelineClient extends CompositeService implements
     Flushable {
 
   /**
-   * Create a timeline client. The current UGI when the user initialize the
-   * client will be used to do the put and the delegation token operations. The
-   * current user may use {@link UserGroupInformation#doAs} another user to
-   * construct and initialize a timeline client if the following operations are
-   * supposed to be conducted by that user.
-   */
-  private ApplicationId contextAppId;
-
-  /**
    * Creates an instance of the timeline v.1.x client.
+   * The current UGI when the user initialize the client will be used to do the
+   * put and the delegation token operations. The current user may use
+   * {@link UserGroupInformation#doAs} another user to construct and initialize
+   * a timeline client if the following operations are supposed to be conducted
+   * by that user.
    *
    * @return the created timeline client instance
    */
@@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
     return client;
   }
 
-  /**
-   * Creates an instance of the timeline v.2 client.
-   *
-   * @param appId the application id with which the timeline client is
-   * associated
-   * @return the created timeline client instance
-   */
-  @Public
-  public static TimelineClient createTimelineClient(ApplicationId appId) {
-    TimelineClient client = new TimelineClientImpl(appId);
-    return client;
-  }
-
-  @Private
-  protected TimelineClient(String name, ApplicationId appId) {
+  protected TimelineClient(String name) {
     super(name);
-    setContextAppId(appId);
   }
 
   /**
@@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
   public abstract void cancelDelegationToken(
       Token<TimelineDelegationTokenIdentifier> timelineDT)
           throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Send the information of a number of conceptual entities to the timeline
-   * service v.2 collector. It is a blocking API. The method will not return
-   * until all the put entities have been persisted. If this method is invoked
-   * for a non-v.2 timeline client instance, a YarnException is thrown.
-   * </p>
-   *
-   * @param entities the collection of {@link
-   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
-   * @throws IOException
-   * @throws YarnException
-   */
-  @Public
-  public abstract void putEntities(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Send the information of a number of conceptual entities to the timeline
-   * service v.2 collector. It is an asynchronous API. The method will return
-   * once all the entities are received. If this method is invoked for a
-   * non-v.2 timeline client instance, a YarnException is thrown.
-   * </p>
-   *
-   * @param entities the collection of {@link
-   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
-   * @throws IOException
-   * @throws YarnException
-   */
-  @Public
-  public abstract void putEntitiesAsync(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Update the timeline service address where the request will be sent to.
-   * </p>
-   * @param address
-   *          the timeline service address
-   */
-  public abstract void setTimelineServiceAddress(String address);
-
-  protected ApplicationId getContextAppId() {
-    return contextAppId;
-  }
-
-  protected void setContextAppId(ApplicationId appId) {
-    this.contextAppId = appId;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
new file mode 100644
index 0000000..32cf1e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities. This client library needs to be used along
+ * with time line v.2 server version.
+ * Refer {@link TimelineClient} for ATS V1 interface.
+ */
+public abstract class TimelineV2Client extends CompositeService {
+  /**
+   * Creates an instance of the timeline v.2 client.
+   *
+   * @param appId the application id with which the timeline client is
+   *          associated
+   * @return the created timeline client instance
+   */
+  @Public
+  public static TimelineV2Client createTimelineClient(ApplicationId appId) {
+    TimelineV2Client client = new TimelineV2ClientImpl(appId);
+    return client;
+  }
+
+  protected TimelineV2Client(String name) {
+    super(name);
+  }
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is a blocking API. The method will not return
+   * until all the put entities have been persisted.
+   * </p>
+   *
+   * @param entities the collection of {@link TimelineEntity}
+   * @throws IOException  if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
+   */
+  @Public
+  public abstract void putEntities(TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is an asynchronous API. The method will return
+   * once all the entities are received.
+   * </p>
+   *
+   * @param entities the collection of {@link TimelineEntity}
+   * @throws IOException  if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
+   */
+  @Public
+  public abstract void putEntitiesAsync(TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Update the timeline service address where the request will be sent to.
+   * </p>
+   *
+   * @param address the timeline service address
+   */
+  public abstract void setTimelineServiceAddress(String address);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 4506c48..f49618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientRequest;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.client.filter.ClientFilter;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @Private
 @Evolving
@@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient {
   private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
-  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
-  private static final Joiner JOINER = Joiner.on("");
-  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
 
   private static Options opts;
   private static final String ENTITY_DATA_TYPE = "entity";
@@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
     opts.addOption("help", false, "Print usage");
   }
 
-  private Client client;
-  private ConnectionConfigurator connConfigurator;
-  private DelegationTokenAuthenticator authenticator;
-  private DelegationTokenAuthenticatedURL.Token token;
-  private UserGroupInformation authUgi;
-  private String doAsUser;
-  private Configuration configuration;
-  private float timelineServiceVersion;
-  private TimelineWriter timelineWriter;
-  private SSLFactory sslFactory;
-
-  private volatile String timelineServiceAddress;
-
-  // Retry parameters for identifying new timeline service
-  // TODO consider to merge with connection retry
-  private int maxServiceRetries;
-  private long serviceRetryInterval;
-  private boolean timelineServiceV2 = false;
-
-  @Private
   @VisibleForTesting
-  TimelineClientConnectionRetry connectionRetry;
-
-  private TimelineEntityDispatcher entityDispatcher;
-
-  // Abstract class for an operation that should be retried by timeline client
-  @Private
+  protected DelegationTokenAuthenticatedURL.Token token;
   @VisibleForTesting
-  public static abstract class TimelineClientRetryOp {
-    // The operation that should be retried
-    public abstract Object run() throws IOException;
-    // The method to indicate if we should retry given the incoming exception
-    public abstract boolean shouldRetryOn(Exception e);
-  }
-
-  // Class to handle retry
-  // Outside this class, only visible to tests
-  @Private
+  protected UserGroupInformation authUgi;
   @VisibleForTesting
-  static class TimelineClientConnectionRetry {
-
-    // maxRetries < 0 means keep trying
-    @Private
-    @VisibleForTesting
-    public int maxRetries;
-
-    @Private
-    @VisibleForTesting
-    public long retryInterval;
-
-    // Indicates if retries happened last time. Only tests should read it.
-    // In unit tests, retryOn() calls should _not_ be concurrent.
-    private boolean retried = false;
+  protected String doAsUser;
 
-    @Private
-    @VisibleForTesting
-    boolean getRetired() {
-      return retried;
-    }
-
-    // Constructor with default retry settings
-    public TimelineClientConnectionRetry(Configuration conf) {
-      Preconditions.checkArgument(conf.getInt(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
-          "%s property value should be greater than or equal to -1",
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      Preconditions
-          .checkArgument(
-              conf.getLong(
-                  YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-                  YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
-              "%s property value should be greater than zero",
-              YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-      maxRetries = conf.getInt(
-        YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      retryInterval = conf.getLong(
-        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-    }
-
-    public Object retryOn(TimelineClientRetryOp op)
-        throws RuntimeException, IOException {
-      int leftRetries = maxRetries;
-      retried = false;
-
-      // keep trying
-      while (true) {
-        try {
-          // try perform the op, if fail, keep retrying
-          return op.run();
-        } catch (IOException | RuntimeException e) {
-          // break if there's no retries left
-          if (leftRetries == 0) {
-            break;
-          }
-          if (op.shouldRetryOn(e)) {
-            logException(e, leftRetries);
-          } else {
-            throw e;
-          }
-        }
-        if (leftRetries > 0) {
-          leftRetries--;
-        }
-        retried = true;
-        try {
-          // sleep for the given time interval
-          Thread.sleep(retryInterval);
-        } catch (InterruptedException ie) {
-          LOG.warn("Client retry sleep interrupted! ");
-        }
-      }
-      throw new RuntimeException("Failed to connect to timeline server. "
-          + "Connection retries limit exceeded. "
-          + "The posted timeline event may be missing");
-    };
-
-    private void logException(Exception e, int leftRetries) {
-      if (leftRetries > 0) {
-        LOG.info("Exception caught by TimelineClientConnectionRetry,"
-              + " will try " + leftRetries + " more time(s).\nMessage: "
-              + e.getMessage());
-      } else {
-        // note that maxRetries may be -1 at the very beginning
-        LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
-            + " will keep retrying.\nMessage: "
-            + e.getMessage());
-      }
-    }
-  }
+  private float timelineServiceVersion;
+  private TimelineWriter timelineWriter;
 
-  private class TimelineJerseyRetryFilter extends ClientFilter {
-    @Override
-    public ClientResponse handle(final ClientRequest cr)
-        throws ClientHandlerException {
-      // Set up the retry operation
-      TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
-        @Override
-        public Object run() {
-          // Try pass the request, if fail, keep retrying
-          return getNext().handle(cr);
-        }
+  private String timelineServiceAddress;
 
-        @Override
-        public boolean shouldRetryOn(Exception e) {
-          // Only retry on connection exceptions
-          return (e instanceof ClientHandlerException)
-              && (e.getCause() instanceof ConnectException ||
-                  e.getCause() instanceof SocketTimeoutException);
-        }
-      };
-      try {
-        return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
-      } catch (IOException e) {
-        throw new ClientHandlerException("Jersey retry failed!\nMessage: "
-              + e.getMessage());
-      }
-    }
-  }
+  @Private
+  @VisibleForTesting
+  TimelineConnector connector;
 
   public TimelineClientImpl() {
-    super(TimelineClientImpl.class.getName(), null);
-  }
-
-  public TimelineClientImpl(ApplicationId applicationId) {
-    super(TimelineClientImpl.class.getName(), applicationId);
-    this.timelineServiceV2 = true;
+    super(TimelineClientImpl.class.getName());
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
-    this.configuration = conf;
+    timelineServiceVersion =
+        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+    LOG.info("Timeline service address: " + getTimelineServiceAddress());
+    if (!YarnConfiguration.timelineServiceEnabled(conf)
+        || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
+            || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+      throw new IOException("Timeline V1 client is not properly configured. "
+          + "Either timeline service is not enabled or version is not set to"
+          + " 1.x");
+    }
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation realUgi = ugi.getRealUser();
     if (realUgi != null) {
@@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
       authUgi = ugi;
       doAsUser = null;
     }
-    ClientConfig cc = new DefaultClientConfig();
-    cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
-    connConfigurator = initConnConfigurator(conf);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      authenticator = new KerberosDelegationTokenAuthenticator();
-    } else {
-      authenticator = new PseudoDelegationTokenAuthenticator();
-    }
-    authenticator.setConnectionConfigurator(connConfigurator);
     token = new DelegationTokenAuthenticatedURL.Token();
+    connector = createTimelineConnector();
 
-    connectionRetry = new TimelineClientConnectionRetry(conf);
-    client = new Client(new URLConnectionClientHandler(
-        new TimelineURLConnectionFactory()), cc);
-    TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
-    // TODO need to cleanup filter retry later.
-    if (!timelineServiceV2) {
-      client.addFilter(retryFilter);
-    }
-
-    // old version timeline service need to get address from configuration
-    // while new version need to auto discovery (with retry).
-    if (timelineServiceV2) {
-      maxServiceRetries = conf.getInt(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      serviceRetryInterval = conf.getLong(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-      entityDispatcher = new TimelineEntityDispatcher(conf);
+    if (YarnConfiguration.useHttps(conf)) {
+      timelineServiceAddress =
+          conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
     } else {
-      if (YarnConfiguration.useHttps(conf)) {
-        setTimelineServiceAddress(conf.get(
-            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
-      } else {
-        setTimelineServiceAddress(conf.get(
-            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
-      }
-      timelineServiceVersion =
-          conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
-      LOG.info("Timeline service address: " + getTimelineServiceAddress());
+      timelineServiceAddress =
+          conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
     }
     super.serviceInit(conf);
   }
 
+  @VisibleForTesting
+  protected TimelineConnector createTimelineConnector() {
+    TimelineConnector newConnector =
+        new TimelineConnector(true, authUgi, doAsUser, token);
+    addIfService(newConnector);
+    return newConnector;
+  }
+
   @Override
   protected void serviceStart() throws Exception {
-    if (timelineServiceV2) {
-      entityDispatcher.start();
-    } else {
-      timelineWriter = createTimelineWriter(configuration, authUgi, client,
-          constructResURI(getConfig(), timelineServiceAddress, false));
-    }
+    timelineWriter = createTimelineWriter(getConfig(), authUgi,
+        connector.getClient(), TimelineConnector.constructResURI(getConfig(),
+            timelineServiceAddress, RESOURCE_URI_STR_V1));
   }
 
   protected TimelineWriter createTimelineWriter(Configuration conf,
@@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
     if (this.timelineWriter != null) {
       this.timelineWriter.close();
     }
-    if (timelineServiceV2) {
-      entityDispatcher.stop();
-    }
-    if (this.sslFactory != null) {
-      this.sslFactory.destroy();
-    }
     super.serviceStop();
   }
 
@@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   @Override
-  public TimelinePutResponse putEntities(
-      TimelineEntity... entities) throws IOException, YarnException {
+  public TimelinePutResponse putEntities(TimelineEntity... entities)
+      throws IOException, YarnException {
     return timelineWriter.putEntities(entities);
   }
 
   @Override
-  public void putEntities(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException {
-    if (!timelineServiceV2) {
-      throw new YarnException("v.2 method is invoked on a v.1.x client");
-    }
-    entityDispatcher.dispatchEntities(true, entities);
-  }
-
-  @Override
-  public void putEntitiesAsync(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException {
-    if (!timelineServiceV2) {
-      throw new YarnException("v.2 method is invoked on a v.1.x client");
-    }
-    entityDispatcher.dispatchEntities(false, entities);
-  }
-
-  @Override
   public void putDomain(TimelineDomain domain) throws IOException,
       YarnException {
     timelineWriter.putDomain(domain);
   }
 
-  // Used for new timeline service only
-  @Private
-  protected void putObjects(String path, MultivaluedMap<String, String> params,
-      Object obj) throws IOException, YarnException {
-
-    int retries = verifyRestEndPointAvailable();
-
-    // timelineServiceAddress could be stale, add retry logic here.
-    boolean needRetry = true;
-    while (needRetry) {
-      try {
-        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
-        putObjects(uri, path, params, obj);
-        needRetry = false;
-      } catch (IOException e) {
-        // handle exception for timelineServiceAddress being updated.
-        checkRetryWithSleep(retries, e);
-        retries--;
-      }
-    }
-  }
-
-  private int verifyRestEndPointAvailable() throws YarnException {
-    // timelineServiceAddress could haven't be initialized yet
-    // or stale (only for new timeline service)
-    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
-    if (timelineServiceAddress == null) {
-      String errMessage = "TimelineClient has reached to max retry times : "
-          + this.maxServiceRetries
-          + ", but failed to fetch timeline service address. Please verify"
-          + " Timeline Auxiliary Service is configured in all the NMs";
-      LOG.error(errMessage);
-      throw new YarnException(errMessage);
-    }
-    return retries;
-  }
-
-  /**
-   * Check if reaching to maximum of retries.
-   * @param retries
-   * @param e
-   */
-  private void checkRetryWithSleep(int retries, IOException e)
-      throws YarnException, IOException {
-    if (retries > 0) {
-      try {
-        Thread.sleep(this.serviceRetryInterval);
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        throw new YarnException("Interrupted while retrying to connect to ATS");
-      }
-    } else {
-      StringBuilder msg =
-          new StringBuilder("TimelineClient has reached to max retry times : ");
-      msg.append(this.maxServiceRetries);
-      msg.append(" for service address: ");
-      msg.append(timelineServiceAddress);
-      LOG.error(msg.toString());
-      throw new IOException(msg.toString(), e);
-    }
-  }
-
-  protected void putObjects(
-      URI base, String path, MultivaluedMap<String, String> params, Object obj)
-          throws IOException, YarnException {
-    ClientResponse resp;
-    try {
-      resp = client.resource(base).path(path).queryParams(params)
-          .accept(MediaType.APPLICATION_JSON)
-          .type(MediaType.APPLICATION_JSON)
-          .put(ClientResponse.class, obj);
-    } catch (RuntimeException re) {
-      // runtime exception is expected if the client cannot connect the server
-      String msg =
-          "Failed to get the response from the timeline server.";
-      LOG.error(msg, re);
-      throw new IOException(re);
-    }
-    if (resp == null ||
-        resp.getStatusInfo().getStatusCode() !=
-            ClientResponse.Status.OK.getStatusCode()) {
-      String msg = "Response from the timeline server is " +
-          ((resp == null) ? "null":
-          "not successful," + " HTTP error code: " + resp.getStatus()
-          + ", Server response:\n" + resp.getEntity(String.class));
-      LOG.error(msg);
-      throw new YarnException(msg);
-    }
-  }
-
-  @Override
-  public void setTimelineServiceAddress(String address) {
-    this.timelineServiceAddress = address;
-  }
-
   private String getTimelineServiceAddress() {
     return this.timelineServiceAddress;
   }
@@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
           public Token<TimelineDelegationTokenIdentifier> run()
               throws Exception {
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // TODO we should add retry logic here if timelineServiceAddress is
             // not available immediately.
             return (Token) authUrl.getDelegationToken(
-                constructResURI(getConfig(),
-                    getTimelineServiceAddress(), false).toURL(),
+                TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
                 token, renewer, doAsUser);
           }
         };
-    return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
+    return (Token<TimelineDelegationTokenIdentifier>) connector
+        .operateDelegationToken(getDTAction);
   }
 
   @SuppressWarnings("unchecked")
@@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
               token.setDelegationToken((Token) timelineDT);
             }
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ?
-                constructResURI(getConfig(), getTimelineServiceAddress(), false)
+            final URI serviceURI = isTokenServiceAddrEmpty
+                ? TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR_V1, null, null);
+                    address.getPort(), RESOURCE_URI_STR_V1, null, null);
             return authUrl
                 .renewDelegationToken(serviceURI.toURL(), token, doAsUser);
           }
         };
-    return (Long) operateDelegationToken(renewDTAction);
+    return (Long) connector.operateDelegationToken(renewDTAction);
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public void cancelDelegationToken(
       final Token<TimelineDelegationTokenIdentifier> timelineDT)
-          throws IOException, YarnException {
+      throws IOException, YarnException {
     final boolean isTokenServiceAddrEmpty =
         timelineDT.getService().toString().isEmpty();
     final String scheme = isTokenServiceAddrEmpty ? null
@@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
               token.setDelegationToken((Token) timelineDT);
             }
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ?
-                constructResURI(getConfig(), getTimelineServiceAddress(), false)
+            final URI serviceURI = isTokenServiceAddrEmpty
+                ? TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR_V1, null, null);
+                    address.getPort(), RESOURCE_URI_STR_V1, null, null);
             authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
             return null;
           }
         };
-    operateDelegationToken(cancelDTAction);
+    connector.operateDelegationToken(cancelDTAction);
   }
 
   @Override
   public String toString() {
     return super.toString() + " with timeline server "
-        + constructResURI(getConfig(), getTimelineServiceAddress(), false)
+        + TimelineConnector.constructResURI(getConfig(),
+            getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
         + " and writer " + timelineWriter;
   }
 
-  private Object operateDelegationToken(
-      final PrivilegedExceptionAction<?> action)
-      throws IOException, YarnException {
-    // Set up the retry operation
-    TimelineClientRetryOp tokenRetryOp =
-        createTimelineClientRetryOpForOperateDelegationToken(action);
-
-    return connectionRetry.retryOn(tokenRetryOp);
-  }
-
-  /**
-   * Poll TimelineServiceAddress for maximum of retries times if it is null.
-   *
-   * @param retries
-   * @return the left retry times
-   * @throws IOException
-   */
-  private int pollTimelineServiceAddress(int retries) throws YarnException {
-    while (timelineServiceAddress == null && retries > 0) {
-      try {
-        Thread.sleep(this.serviceRetryInterval);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new YarnException("Interrupted while trying to connect ATS");
-      }
-      retries--;
-    }
-    return retries;
-  }
-
-  private class TimelineURLConnectionFactory
-      implements HttpURLConnectionFactory {
-
-    @Override
-    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
-      authUgi.checkTGTAndReloginFromKeytab();
-      try {
-        return new DelegationTokenAuthenticatedURL(
-            authenticator, connConfigurator).openConnection(url, token,
-              doAsUser);
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      } catch (AuthenticationException ae) {
-        throw new IOException(ae);
-      }
-    }
-
-  }
-
-  private ConnectionConfigurator initConnConfigurator(Configuration conf) {
-    try {
-      return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
-    } catch (Exception e) {
-      LOG.debug("Cannot load customized ssl related configuration. " +
-          "Fallback to system-generic settings.", e);
-      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
-    }
-  }
-
-  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
-      new ConnectionConfigurator() {
-    @Override
-    public HttpURLConnection configure(HttpURLConnection conn)
-        throws IOException {
-      setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
-      return conn;
-    }
-  };
-
-  private ConnectionConfigurator initSslConnConfigurator(final int timeout,
-      Configuration conf) throws IOException, GeneralSecurityException {
-    final SSLSocketFactory sf;
-    final HostnameVerifier hv;
-
-    sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-    sslFactory.init();
-    sf = sslFactory.createSSLSocketFactory();
-    hv = sslFactory.getHostnameVerifier();
-
-    return new ConnectionConfigurator() {
-      @Override
-      public HttpURLConnection configure(HttpURLConnection conn)
-          throws IOException {
-        if (conn instanceof HttpsURLConnection) {
-          HttpsURLConnection c = (HttpsURLConnection) conn;
-          c.setSSLSocketFactory(sf);
-          c.setHostnameVerifier(hv);
-        }
-        setTimeouts(conn, timeout);
-        return conn;
-      }
-    };
-  }
-
-  private static void setTimeouts(URLConnection connection, int socketTimeout) {
-    connection.setConnectTimeout(socketTimeout);
-    connection.setReadTimeout(socketTimeout);
-  }
-
-  private static URI constructResURI(
-      Configuration conf, String address, boolean v2) {
-    return URI.create(
-        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
-            address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
-  }
-
   public static void main(String[] argv) throws Exception {
     CommandLine cliParser = new GnuParser().parse(opts, argv);
     if (cliParser.hasOption("put")) {
@@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
   public void setTimelineWriter(TimelineWriter writer) {
     this.timelineWriter = writer;
   }
-
-  @Private
-  @VisibleForTesting
-  public TimelineClientRetryOp
-      createTimelineClientRetryOpForOperateDelegationToken(
-          final PrivilegedExceptionAction<?> action) throws IOException {
-    return new TimelineClientRetryOpForOperateDelegationToken(
-        this.authUgi, action);
-  }
-
-  @Private
-  @VisibleForTesting
-  public class TimelineClientRetryOpForOperateDelegationToken
-      extends TimelineClientRetryOp {
-
-    private final UserGroupInformation authUgi;
-    private final PrivilegedExceptionAction<?> action;
-
-    public TimelineClientRetryOpForOperateDelegationToken(
-        UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
-      this.authUgi = authUgi;
-      this.action = action;
-    }
-
-    @Override
-    public Object run() throws IOException {
-      // Try pass the request, if fail, keep retrying
-      authUgi.checkTGTAndReloginFromKeytab();
-      try {
-        return authUgi.doAs(action);
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public boolean shouldRetryOn(Exception e) {
-      // retry on connection exceptions
-      // and SocketTimeoutException
-      return (e instanceof ConnectException
-          || e instanceof SocketTimeoutException);
-    }
-  }
-
-  private final class EntitiesHolder extends FutureTask<Void> {
-    private final
-        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-            entities;
-    private final boolean isSync;
-
-    EntitiesHolder(
-        final
-            org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-                entities,
-        final boolean isSync) {
-      super(new Callable<Void>() {
-        // publishEntities()
-        public Void call() throws Exception {
-          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
-          params.add("appid", getContextAppId().toString());
-          params.add("async", Boolean.toString(!isSync));
-          putObjects("entities", params, entities);
-          return null;
-        }
-      });
-      this.entities = entities;
-      this.isSync = isSync;
-    }
-
-    public boolean isSync() {
-      return isSync;
-    }
-
-    public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-        getEntities() {
-      return entities;
-    }
-  }
-
-  /**
-   * This class is responsible for collecting the timeline entities and
-   * publishing them in async.
-   */
-  private class TimelineEntityDispatcher {
-    /**
-     * Time period for which the timelineclient will wait for draining after
-     * stop.
-     */
-    private static final long DRAIN_TIME_PERIOD = 2000L;
-
-    private int numberOfAsyncsToMerge;
-    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
-    private ExecutorService executor;
-
-    TimelineEntityDispatcher(Configuration conf) {
-      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
-      numberOfAsyncsToMerge =
-          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
-              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
-    }
-
-    Runnable createRunnable() {
-      return new Runnable() {
-        @Override
-        public void run() {
-          try {
-            EntitiesHolder entitiesHolder;
-            while (!Thread.currentThread().isInterrupted()) {
-              // Merge all the async calls and make one push, but if its sync
-              // call push immediately
-              try {
-                entitiesHolder = timelineEntityQueue.take();
-              } catch (InterruptedException ie) {
-                LOG.info("Timeline dispatcher thread was interrupted ");
-                Thread.currentThread().interrupt();
-                return;
-              }
-              if (entitiesHolder != null) {
-                publishWithoutBlockingOnQueue(entitiesHolder);
-              }
-            }
-          } finally {
-            if (!timelineEntityQueue.isEmpty()) {
-              LOG.info("Yet to publish " + timelineEntityQueue.size()
-                  + " timelineEntities, draining them now. ");
-            }
-            // Try to drain the remaining entities to be published @ the max for
-            // 2 seconds
-            long timeTillweDrain =
-                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
-            while (!timelineEntityQueue.isEmpty()) {
-              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
-              if (System.currentTimeMillis() > timeTillweDrain) {
-                // time elapsed stop publishing further....
-                if (!timelineEntityQueue.isEmpty()) {
-                  LOG.warn("Time to drain elapsed! Remaining "
-                      + timelineEntityQueue.size() + "timelineEntities will not"
-                      + " be published");
-                  // if some entities were not drained then we need interrupt
-                  // the threads which had put sync EntityHolders to the queue.
-                  EntitiesHolder nextEntityInTheQueue = null;
-                  while ((nextEntityInTheQueue =
-                      timelineEntityQueue.poll()) != null) {
-                    nextEntityInTheQueue.cancel(true);
-                  }
-                }
-                break;
-              }
-            }
-          }
-        }
-
-        /**
-         * Publishes the given EntitiesHolder and return immediately if sync
-         * call, else tries to fetch the EntitiesHolder from the queue in non
-         * blocking fashion and collate the Entities if possible before
-         * publishing through REST.
-         *
-         * @param entitiesHolder
-         */
-        private void publishWithoutBlockingOnQueue(
-            EntitiesHolder entitiesHolder) {
-          if (entitiesHolder.isSync()) {
-            entitiesHolder.run();
-            return;
-          }
-          int count = 1;
-          while (true) {
-            // loop till we find a sync put Entities or there is nothing
-            // to take
-            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
-            if (nextEntityInTheQueue == null) {
-              // Nothing in the queue just publish and get back to the
-              // blocked wait state
-              entitiesHolder.run();
-              break;
-            } else if (nextEntityInTheQueue.isSync()) {
-              // flush all the prev async entities first
-              entitiesHolder.run();
-              // and then flush the sync entity
-              nextEntityInTheQueue.run();
-              break;
-            } else {
-              // append all async entities together and then flush
-              entitiesHolder.getEntities().addEntities(
-                  nextEntityInTheQueue.getEntities().getEntities());
-              count++;
-              if (count == numberOfAsyncsToMerge) {
-                // Flush the entities if the number of the async
-                // putEntites merged reaches the desired limit. To avoid
-                // collecting multiple entities and delaying for a long
-                // time.
-                entitiesHolder.run();
-                break;
-              }
-            }
-          }
-        }
-      };
-    }
-
-    public void dispatchEntities(boolean sync,
-        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
-            entitiesTobePublished) throws YarnException {
-      if (executor.isShutdown()) {
-        throw new YarnException("Timeline client is in the process of stopping,"
-            + " not accepting any more TimelineEntities");
-      }
-
-      // wrap all TimelineEntity into TimelineEntities object
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-          entities =
-              new org.apache.hadoop.yarn.api.records.timelineservice.
-                  TimelineEntities();
-      for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
-               entity : entitiesTobePublished) {
-        entities.addEntity(entity);
-      }
-
-      // created a holder and place it in queue
-      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
-      try {
-        timelineEntityQueue.put(entitiesHolder);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new YarnException(
-            "Failed while adding entity to the queue for publishing", e);
-      }
-
-      if (sync) {
-        // In sync call we need to wait till its published and if any error then
-        // throw it back
-        try {
-          entitiesHolder.get();
-        } catch (ExecutionException e) {
-          throw new YarnException("Failed while publishing entity",
-              e.getCause());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new YarnException("Interrupted while publishing entity", e);
-        }
-      }
-    }
-
-    public void start() {
-      executor = Executors.newSingleThreadExecutor();
-      executor.execute(createRunnable());
-    }
-
-    public void stop() {
-      LOG.info("Stopping TimelineClient.");
-      executor.shutdownNow();
-      try {
-        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        e.printStackTrace();
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org