You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2017/02/16 19:41:27 UTC

[2/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.

YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4fa1afdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4fa1afdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4fa1afdb

Branch: refs/heads/trunk
Commit: 4fa1afdb883dab8786d2fb5c72a195dd2e87d711
Parents: 5690b51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Feb 16 11:41:04 2017 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Feb 16 11:41:04 2017 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryEventHandler.java      |  57 +-
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    |  14 +-
 .../v2/app/rm/RMContainerAllocator.java         |   4 +-
 .../jobhistory/TestJobHistoryEventHandler.java  |   8 +-
 .../distributedshell/ApplicationMaster.java     |  98 ++-
 .../hadoop/yarn/client/api/AMRMClient.java      |  40 +-
 .../yarn/client/api/async/AMRMClientAsync.java  |  21 +-
 .../api/async/impl/AMRMClientAsyncImpl.java     |   5 +-
 .../yarn/client/api/impl/YarnClientImpl.java    |  15 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |  94 +--
 .../yarn/client/api/TimelineV2Client.java       |  92 +++
 .../client/api/impl/TimelineClientImpl.java     | 825 ++-----------------
 .../yarn/client/api/impl/TimelineConnector.java | 440 ++++++++++
 .../client/api/impl/TimelineV2ClientImpl.java   | 459 +++++++++++
 .../client/api/impl/TestTimelineClient.java     |  39 +-
 .../api/impl/TestTimelineClientV2Impl.java      |   4 +-
 .../timelineservice/NMTimelinePublisher.java    |  22 +-
 .../TestNMTimelinePublisher.java                |  10 +-
 .../TestTimelineServiceClientIntegration.java   |  10 +-
 19 files changed, 1272 insertions(+), 985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 0cc605c..285d36e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientHandlerException;
 
@@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
  */
 public class JobHistoryEventHandler extends AbstractService
     implements EventHandler<JobHistoryEvent> {
-  private static final JsonNodeFactory FACTORY =
-      new ObjectMapper().getNodeFactory();
 
   private final AppContext context;
   private final int startCount;
@@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService
   // should job completion be force when the AM shuts down?
   protected volatile boolean forceJobCompletion = false;
 
+  @VisibleForTesting
   protected TimelineClient timelineClient;
-
-  private boolean timelineServiceV2Enabled = false;
+  @VisibleForTesting
+  protected TimelineV2Client timelineV2Client;
 
   private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
   private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService
         MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
       LOG.info("Emitting job history data to the timeline service is enabled");
       if (YarnConfiguration.timelineServiceEnabled(conf)) {
-
-        timelineClient =
-            ((MRAppMaster.RunningAppContext)context).getTimelineClient();
-        timelineClient.init(conf);
-        timelineServiceV2Enabled =
-            YarnConfiguration.timelineServiceV2Enabled(conf);
+        boolean timelineServiceV2Enabled =
+            ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+        if(timelineServiceV2Enabled) {
+          timelineV2Client =
+              ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
+          timelineV2Client.init(conf);
+        } else {
+          timelineClient =
+              ((MRAppMaster.RunningAppContext) context).getTimelineClient();
+          timelineClient.init(conf);
+        }
         LOG.info("Timeline service is enabled; version: " +
             YarnConfiguration.getTimelineServiceVersion(conf));
       } else {
@@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService
   protected void serviceStart() throws Exception {
     if (timelineClient != null) {
       timelineClient.start();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.start();
     }
     eventHandlingThread = new Thread(new Runnable() {
       @Override
@@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService
     }
     if (timelineClient != null) {
       timelineClient.stop();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.stop();
     }
     LOG.info("Stopped JobHistoryEventHandler. super.stop()");
     super.serviceStop();
@@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService
         }
         processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
             event.getJobID());
-        if (timelineClient != null) {
-          if (timelineServiceV2Enabled) {
-            processEventForNewTimelineService(historyEvent, event.getJobID(),
-                event.getTimestamp());
-          } else {
-            processEventForTimelineServer(historyEvent, event.getJobID(),
-                event.getTimestamp());
-          }
+        if (timelineV2Client != null) {
+          processEventForNewTimelineService(historyEvent, event.getJobID(),
+              event.getTimestamp());
+        } else if (timelineClient != null) {
+          processEventForTimelineServer(historyEvent, event.getJobID(),
+              event.getTimestamp());
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("In HistoryEventHandler "
@@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService
         configSize += size;
         if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
           if (jobEntityForConfigs.getConfigs().size() > 0) {
-            timelineClient.putEntities(jobEntityForConfigs);
-            timelineClient.putEntities(appEntityForConfigs);
+            timelineV2Client.putEntities(jobEntityForConfigs);
+            timelineV2Client.putEntities(appEntityForConfigs);
             jobEntityForConfigs = createJobEntity(jobId);
             appEntityForConfigs = new ApplicationEntity();
             appEntityForConfigs.setId(appId);
@@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService
         appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
       }
       if (configSize > 0) {
-        timelineClient.putEntities(jobEntityForConfigs);
-        timelineClient.putEntities(appEntityForConfigs);
+        timelineV2Client.putEntities(jobEntityForConfigs);
+        timelineV2Client.putEntities(appEntityForConfigs);
       }
     } catch (IOException | YarnException e) {
       LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
@@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService
     }
     try {
       if (appEntityWithJobMetrics == null) {
-        timelineClient.putEntitiesAsync(tEntity);
+        timelineV2Client.putEntitiesAsync(tEntity);
       } else {
-        timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
+        timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
       }
     } catch (IOException | YarnException e) {
       LOG.error("Failed to process Event " + event.getEventType()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 835c0aa..12df83d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.crypto.KeyGenerator;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import javax.crypto.KeyGenerator;
-
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService {
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
     private TimelineClient timelineClient = null;
+    private TimelineV2Client timelineV2Client = null;
 
     private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
 
@@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService {
 
         if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
           // create new version TimelineClient
-          timelineClient = TimelineClient.createTimelineClient(
+          timelineV2Client = TimelineV2Client.createTimelineClient(
               appAttemptID.getApplicationId());
         } else {
           timelineClient = TimelineClient.createTimelineClient();
@@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService {
       return taskAttemptFinishingMonitor;
     }
 
-    // Get Timeline Collector's address (get sync from RM)
     public TimelineClient getTimelineClient() {
       return timelineClient;
     }
+
+    // Get Timeline Collector's address (get sync from RM)
+    public TimelineV2Client getTimelineV2Client() {
+      return timelineV2Client;
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 31bc380..1f88a2c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     MRAppMaster.RunningAppContext appContext =
         (MRAppMaster.RunningAppContext)this.getContext();
     if (collectorAddr != null && !collectorAddr.isEmpty()
-        && appContext.getTimelineClient() != null) {
-      appContext.getTimelineClient().setTimelineServiceAddress(
+        && appContext.getTimelineV2Client() != null) {
+      appContext.getTimelineV2Client().setTimelineServiceAddress(
           response.getCollectorAddr());
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 0b33d6b..6c5e604 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -29,8 +29,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler {
     if (mockContext instanceof RunningAppContext) {
       when(((RunningAppContext)mockContext).getTimelineClient()).
           thenReturn(TimelineClient.createTimelineClient());
+      when(((RunningAppContext) mockContext).getTimelineV2Client())
+          .thenReturn(TimelineV2Client
+              .createTimelineClient(ApplicationId.newInstance(0, 1)));
     }
     return mockContext;
   }
@@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
   protected void serviceStart() {
     if (timelineClient != null) {
       timelineClient.start();
+    } else if (timelineV2Client != null) {
+      timelineV2Client.start();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 5a06ef6..4daebb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -219,7 +220,9 @@ public class ApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
-  private boolean timelineServiceV2 = false;
+  private boolean timelineServiceV2Enabled = false;
+
+  private boolean timelineServiceV1Enabled = false;
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -293,6 +296,10 @@ public class ApplicationMaster {
   // Timeline Client
   @VisibleForTesting
   TimelineClient timelineClient;
+
+  // Timeline v2 Client
+  private TimelineV2Client timelineV2Client;
+
   static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
   static final String APPID_TIMELINE_FILTER_NAME = "appId";
   static final String USER_TIMELINE_FILTER_NAME = "user";
@@ -556,9 +563,12 @@ public class ApplicationMaster {
         "container_retry_interval", "0"));
 
     if (YarnConfiguration.timelineServiceEnabled(conf)) {
-      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
+      timelineServiceV2Enabled =
+          ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+      timelineServiceV1Enabled = !timelineServiceV2Enabled;
     } else {
       timelineClient = null;
+      timelineV2Client = null;
       LOG.warn("Timeline service is not enabled");
     }
 
@@ -621,18 +631,17 @@ public class ApplicationMaster {
     nmClientAsync.start();
 
     startTimelineClient(conf);
-    if (timelineServiceV2) {
+    if (timelineServiceV2Enabled) {
       // need to bind timelineClient
-      amRMClient.registerTimelineClient(timelineClient);
+      amRMClient.registerTimelineV2Client(timelineV2Client);
     }
-    if(timelineClient != null) {
-      if (timelineServiceV2) {
-        publishApplicationAttemptEventOnTimelineServiceV2(
-            DSEvent.DS_APP_ATTEMPT_START);
-      } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
-      }
+
+    if (timelineServiceV2Enabled) {
+      publishApplicationAttemptEventOnTimelineServiceV2(
+          DSEvent.DS_APP_ATTEMPT_START);
+    } else if (timelineServiceV1Enabled) {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
     }
 
     // Setup local RPC Server to accept status requests directly from clients
@@ -704,18 +713,21 @@ public class ApplicationMaster {
         public Void run() throws Exception {
           if (YarnConfiguration.timelineServiceEnabled(conf)) {
             // Creating the Timeline Client
-            if (timelineServiceV2) {
-              timelineClient = TimelineClient.createTimelineClient(
+            if (timelineServiceV2Enabled) {
+              timelineV2Client = TimelineV2Client.createTimelineClient(
                   appAttemptID.getApplicationId());
+              timelineV2Client.init(conf);
+              timelineV2Client.start();
               LOG.info("Timeline service V2 client is enabled");
             } else {
               timelineClient = TimelineClient.createTimelineClient();
+              timelineClient.init(conf);
+              timelineClient.start();
               LOG.info("Timeline service V1 client is enabled");
             }
-            timelineClient.init(conf);
-            timelineClient.start();
           } else {
             timelineClient = null;
+            timelineV2Client = null;
             LOG.warn("Timeline service is not enabled");
           }
           return null;
@@ -741,14 +753,12 @@ public class ApplicationMaster {
       } catch (InterruptedException ex) {}
     }
 
-    if (timelineClient != null) {
-      if (timelineServiceV2) {
-        publishApplicationAttemptEventOnTimelineServiceV2(
-            DSEvent.DS_APP_ATTEMPT_END);
-      } else {
-        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
-      }
+    if (timelineServiceV2Enabled) {
+      publishApplicationAttemptEventOnTimelineServiceV2(
+          DSEvent.DS_APP_ATTEMPT_END);
+    } else if (timelineServiceV1Enabled) {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
     }
 
     // Join all launched threads
@@ -797,8 +807,10 @@ public class ApplicationMaster {
     amRMClient.stop();
 
     // Stop Timeline Client
-    if(timelineClient != null) {
+    if(timelineServiceV1Enabled) {
       timelineClient.stop();
+    } else if (timelineServiceV2Enabled) {
+      timelineV2Client.stop();
     }
 
     return success;
@@ -853,16 +865,14 @@ public class ApplicationMaster {
           LOG.info("Container completed successfully." + ", containerId="
               + containerStatus.getContainerId());
         }
-        if(timelineClient != null) {
-          if (timelineServiceV2) {
-            publishContainerEndEventOnTimelineServiceV2(containerStatus);
-          } else {
-            publishContainerEndEvent(
-                timelineClient, containerStatus, domainId, appSubmitterUgi);
-          }
+        if (timelineServiceV2Enabled) {
+          publishContainerEndEventOnTimelineServiceV2(containerStatus);
+        } else if (timelineServiceV1Enabled) {
+          publishContainerEndEvent(timelineClient, containerStatus, domainId,
+              appSubmitterUgi);
         }
       }
-      
+
       // ask for more containers if any failed
       int askCount = numTotalContainers - numRequestedContainers.get();
       numRequestedContainers.addAndGet(askCount);
@@ -983,15 +993,13 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(
             containerId, container.getNodeId());
       }
-      if(applicationMaster.timelineClient != null) {
-        if (applicationMaster.timelineServiceV2) {
-          applicationMaster.publishContainerStartEventOnTimelineServiceV2(
-              container);
-        } else {
-          applicationMaster.publishContainerStartEvent(
-              applicationMaster.timelineClient, container,
-              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
-        }
+      if (applicationMaster.timelineServiceV2Enabled) {
+        applicationMaster
+            .publishContainerStartEventOnTimelineServiceV2(container);
+      } else if (applicationMaster.timelineServiceV1Enabled) {
+        applicationMaster.publishContainerStartEvent(
+            applicationMaster.timelineClient, container,
+            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
       }
     }
 
@@ -1371,7 +1379,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntities(entity);
+          timelineV2Client.putEntities(entity);
           return null;
         }
       });
@@ -1404,7 +1412,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntities(entity);
+          timelineV2Client.putEntities(entity);
           return null;
         }
       });
@@ -1438,7 +1446,7 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override
         public TimelinePutResponse run() throws Exception {
-          timelineClient.putEntitiesAsync(entity);
+          timelineV2Client.putEntitiesAsync(entity);
           return null;
         }
       });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 15d0065..69f3777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
 
-  private TimelineClient timelineClient;
+  private TimelineV2Client timelineV2Client;
+  private boolean timelineServiceV2Enabled;
 
   /**
    * Create a new instance of AMRMClient.
@@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     nmTokenCache = NMTokenCache.getSingleton();
   }
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
+  }
+
   /**
    * Object to represent a single container request for resources. Scheduler
    * documentation should be consulted for the specifics of how the parameters
@@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
   }
 
   /**
-   * Register TimelineClient to AMRMClient.
-   * @param client the timeline client to register
+   * Register TimelineV2Client to AMRMClient. Writer's address for the timeline
+   * V2 client will be updated dynamically if registered.
+   *
+   * @param client the timeline v2 client to register
+   * @throws YarnException when this method is invoked even when ATS V2 is not
+   *           configured.
    */
-  public void registerTimelineClient(TimelineClient client) {
-    this.timelineClient = client;
+  public void registerTimelineV2Client(TimelineV2Client client)
+      throws YarnException {
+    if (timelineServiceV2Enabled) {
+      timelineV2Client = client;
+    } else {
+      LOG.error("Trying to register timeline v2 client when not configured.");
+      throw new YarnException(
+          "register timeline v2 client when not configured.");
+    }
   }
 
   /**
-   * Get registered timeline client.
-   * @return the registered timeline client
+   * Get registered timeline v2 client.
+   * @return the registered timeline v2 client
    */
-  public TimelineClient getRegisteredTimelineClient() {
-    return this.timelineClient;
+  public TimelineV2Client getRegisteredTimelineV2Client() {
+    return this.timelineV2Client;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 4cb27cd..1ecfe1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.client.api.async;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 
 /**
  * <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -346,17 +346,20 @@ extends AbstractService {
   /**
    * Register TimelineClient to AMRMClient.
    * @param timelineClient
+   * @throws YarnException when this method is invoked even when ATS V2 is not
+   *           configured.
    */
-  public void registerTimelineClient(TimelineClient timelineClient) {
-    client.registerTimelineClient(timelineClient);
+  public void registerTimelineV2Client(TimelineV2Client timelineClient)
+      throws YarnException {
+    client.registerTimelineV2Client(timelineClient);
   }
 
   /**
    * Get registered timeline client.
    * @return the registered timeline client
    */
-  public TimelineClient getRegisteredTimelineClient() {
-    return client.getRegisteredTimelineClient();
+  public TimelineV2Client getRegisteredTimelineV2Client() {
+    return client.getRegisteredTimelineV2Client();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 9e2c0e5..6711da2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -326,7 +326,8 @@ extends AMRMClientAsync<T> {
 
           AllocateResponse response = (AllocateResponse) object;
           String collectorAddress = response.getCollectorAddr();
-          TimelineClient timelineClient = client.getRegisteredTimelineClient();
+          TimelineV2Client timelineClient =
+              client.getRegisteredTimelineV2Client();
           if (timelineClient != null && collectorAddress != null
               && !collectorAddress.isEmpty()) {
             if (collectorAddr == null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e406862..4a27fee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
   Text timelineService;
   @VisibleForTesting
   String timelineDTRenewer;
-  protected boolean timelineServiceEnabled;
+  private boolean timelineV1ServiceEnabled;
   protected boolean timelineServiceBestEffort;
 
   private static final String ROOT = "root";
@@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
         YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
     }
 
+    float timelineServiceVersion =
+        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
-      timelineServiceEnabled = true;
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+        && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
+            || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+      timelineV1ServiceEnabled = true;
       timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
       timelineService = TimelineUtils.buildTimelineTokenService(conf);
     }
@@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
     // TimelineServer which means we are able to get history information
     // for applications/applicationAttempts/containers by using ahsClient
     // when the TimelineServer is running.
-    if (timelineServiceEnabled || conf.getBoolean(
+    if (timelineV1ServiceEnabled || conf.getBoolean(
         YarnConfiguration.APPLICATION_HISTORY_ENABLED,
         YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
       historyServiceEnabled = true;
@@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
 
     // Automatically add the timeline DT into the CLC
     // Only when the security and the timeline service are both enabled
-    if (isSecurityEnabled() && timelineServiceEnabled) {
+    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
       addTimelineDelegationToken(appContext.getAMContainerSpec());
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index cc76718..4835239 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
 import java.io.Flushable;
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 
 /**
  * A client library that can be used to post some information in terms of a
- * number of conceptual entities.
+ * number of conceptual entities. This client library needs to be used along
+ * with Timeline V.1.x server versions.
+ * Refer {@link TimelineV2Client} for ATS V2 interface.
  */
 @Public
 @Evolving
-public abstract class TimelineClient extends AbstractService implements
+public abstract class TimelineClient extends CompositeService implements
     Flushable {
 
   /**
-   * Create a timeline client. The current UGI when the user initialize the
-   * client will be used to do the put and the delegation token operations. The
-   * current user may use {@link UserGroupInformation#doAs} another user to
-   * construct and initialize a timeline client if the following operations are
-   * supposed to be conducted by that user.
-   */
-  private ApplicationId contextAppId;
-
-  /**
    * Creates an instance of the timeline v.1.x client.
+   * The current UGI when the user initialize the client will be used to do the
+   * put and the delegation token operations. The current user may use
+   * {@link UserGroupInformation#doAs} another user to construct and initialize
+   * a timeline client if the following operations are supposed to be conducted
+   * by that user.
    *
    * @return the created timeline client instance
    */
@@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
     return client;
   }
 
-  /**
-   * Creates an instance of the timeline v.2 client.
-   *
-   * @param appId the application id with which the timeline client is
-   * associated
-   * @return the created timeline client instance
-   */
-  @Public
-  public static TimelineClient createTimelineClient(ApplicationId appId) {
-    TimelineClient client = new TimelineClientImpl(appId);
-    return client;
-  }
-
-  @Private
-  protected TimelineClient(String name, ApplicationId appId) {
+  protected TimelineClient(String name) {
     super(name);
-    setContextAppId(appId);
   }
 
   /**
@@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
   public abstract void cancelDelegationToken(
       Token<TimelineDelegationTokenIdentifier> timelineDT)
           throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Send the information of a number of conceptual entities to the timeline
-   * service v.2 collector. It is a blocking API. The method will not return
-   * until all the put entities have been persisted. If this method is invoked
-   * for a non-v.2 timeline client instance, a YarnException is thrown.
-   * </p>
-   *
-   * @param entities the collection of {@link
-   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
-   * @throws IOException
-   * @throws YarnException
-   */
-  @Public
-  public abstract void putEntities(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Send the information of a number of conceptual entities to the timeline
-   * service v.2 collector. It is an asynchronous API. The method will return
-   * once all the entities are received. If this method is invoked for a
-   * non-v.2 timeline client instance, a YarnException is thrown.
-   * </p>
-   *
-   * @param entities the collection of {@link
-   * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
-   * @throws IOException
-   * @throws YarnException
-   */
-  @Public
-  public abstract void putEntitiesAsync(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException;
-
-  /**
-   * <p>
-   * Update the timeline service address where the request will be sent to.
-   * </p>
-   * @param address
-   *          the timeline service address
-   */
-  public abstract void setTimelineServiceAddress(String address);
-
-  protected ApplicationId getContextAppId() {
-    return contextAppId;
-  }
-
-  protected void setContextAppId(ApplicationId appId) {
-    this.contextAppId = appId;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
new file mode 100644
index 0000000..32cf1e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities. This client library needs to be used along
+ * with time line v.2 server version.
+ * Refer {@link TimelineClient} for ATS V1 interface.
+ */
+public abstract class TimelineV2Client extends CompositeService {
+  /**
+   * Creates an instance of the timeline v.2 client.
+   *
+   * @param appId the application id with which the timeline client is
+   *          associated
+   * @return the created timeline client instance
+   */
+  @Public
+  public static TimelineV2Client createTimelineClient(ApplicationId appId) {
+    TimelineV2Client client = new TimelineV2ClientImpl(appId);
+    return client;
+  }
+
+  protected TimelineV2Client(String name) {
+    super(name);
+  }
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is a blocking API. The method will not return
+   * until all the put entities have been persisted.
+   * </p>
+   *
+   * @param entities the collection of {@link TimelineEntity}
+   * @throws IOException  if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
+   */
+  @Public
+  public abstract void putEntities(TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * service v.2 collector. It is an asynchronous API. The method will return
+   * once all the entities are received.
+   * </p>
+   *
+   * @param entities the collection of {@link TimelineEntity}
+   * @throws IOException  if there are I/O errors
+   * @throws YarnException if entities are incomplete/invalid
+   */
+  @Public
+  public abstract void putEntitiesAsync(TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Update the timeline service address where the request will be sent to.
+   * </p>
+   *
+   * @param address the timeline service address
+   */
+  public abstract void setTimelineServiceAddress(String address);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 4506c48..f49618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientRequest;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.client.filter.ClientFilter;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @Private
 @Evolving
@@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient {
   private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
-  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
-  private static final Joiner JOINER = Joiner.on("");
-  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
 
   private static Options opts;
   private static final String ENTITY_DATA_TYPE = "entity";
@@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
     opts.addOption("help", false, "Print usage");
   }
 
-  private Client client;
-  private ConnectionConfigurator connConfigurator;
-  private DelegationTokenAuthenticator authenticator;
-  private DelegationTokenAuthenticatedURL.Token token;
-  private UserGroupInformation authUgi;
-  private String doAsUser;
-  private Configuration configuration;
-  private float timelineServiceVersion;
-  private TimelineWriter timelineWriter;
-  private SSLFactory sslFactory;
-
-  private volatile String timelineServiceAddress;
-
-  // Retry parameters for identifying new timeline service
-  // TODO consider to merge with connection retry
-  private int maxServiceRetries;
-  private long serviceRetryInterval;
-  private boolean timelineServiceV2 = false;
-
-  @Private
   @VisibleForTesting
-  TimelineClientConnectionRetry connectionRetry;
-
-  private TimelineEntityDispatcher entityDispatcher;
-
-  // Abstract class for an operation that should be retried by timeline client
-  @Private
+  protected DelegationTokenAuthenticatedURL.Token token;
   @VisibleForTesting
-  public static abstract class TimelineClientRetryOp {
-    // The operation that should be retried
-    public abstract Object run() throws IOException;
-    // The method to indicate if we should retry given the incoming exception
-    public abstract boolean shouldRetryOn(Exception e);
-  }
-
-  // Class to handle retry
-  // Outside this class, only visible to tests
-  @Private
+  protected UserGroupInformation authUgi;
   @VisibleForTesting
-  static class TimelineClientConnectionRetry {
-
-    // maxRetries < 0 means keep trying
-    @Private
-    @VisibleForTesting
-    public int maxRetries;
-
-    @Private
-    @VisibleForTesting
-    public long retryInterval;
-
-    // Indicates if retries happened last time. Only tests should read it.
-    // In unit tests, retryOn() calls should _not_ be concurrent.
-    private boolean retried = false;
+  protected String doAsUser;
 
-    @Private
-    @VisibleForTesting
-    boolean getRetired() {
-      return retried;
-    }
-
-    // Constructor with default retry settings
-    public TimelineClientConnectionRetry(Configuration conf) {
-      Preconditions.checkArgument(conf.getInt(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
-          "%s property value should be greater than or equal to -1",
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      Preconditions
-          .checkArgument(
-              conf.getLong(
-                  YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-                  YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
-              "%s property value should be greater than zero",
-              YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-      maxRetries = conf.getInt(
-        YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      retryInterval = conf.getLong(
-        YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-    }
-
-    public Object retryOn(TimelineClientRetryOp op)
-        throws RuntimeException, IOException {
-      int leftRetries = maxRetries;
-      retried = false;
-
-      // keep trying
-      while (true) {
-        try {
-          // try perform the op, if fail, keep retrying
-          return op.run();
-        } catch (IOException | RuntimeException e) {
-          // break if there's no retries left
-          if (leftRetries == 0) {
-            break;
-          }
-          if (op.shouldRetryOn(e)) {
-            logException(e, leftRetries);
-          } else {
-            throw e;
-          }
-        }
-        if (leftRetries > 0) {
-          leftRetries--;
-        }
-        retried = true;
-        try {
-          // sleep for the given time interval
-          Thread.sleep(retryInterval);
-        } catch (InterruptedException ie) {
-          LOG.warn("Client retry sleep interrupted! ");
-        }
-      }
-      throw new RuntimeException("Failed to connect to timeline server. "
-          + "Connection retries limit exceeded. "
-          + "The posted timeline event may be missing");
-    };
-
-    private void logException(Exception e, int leftRetries) {
-      if (leftRetries > 0) {
-        LOG.info("Exception caught by TimelineClientConnectionRetry,"
-              + " will try " + leftRetries + " more time(s).\nMessage: "
-              + e.getMessage());
-      } else {
-        // note that maxRetries may be -1 at the very beginning
-        LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
-            + " will keep retrying.\nMessage: "
-            + e.getMessage());
-      }
-    }
-  }
+  private float timelineServiceVersion;
+  private TimelineWriter timelineWriter;
 
-  private class TimelineJerseyRetryFilter extends ClientFilter {
-    @Override
-    public ClientResponse handle(final ClientRequest cr)
-        throws ClientHandlerException {
-      // Set up the retry operation
-      TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
-        @Override
-        public Object run() {
-          // Try pass the request, if fail, keep retrying
-          return getNext().handle(cr);
-        }
+  private String timelineServiceAddress;
 
-        @Override
-        public boolean shouldRetryOn(Exception e) {
-          // Only retry on connection exceptions
-          return (e instanceof ClientHandlerException)
-              && (e.getCause() instanceof ConnectException ||
-                  e.getCause() instanceof SocketTimeoutException);
-        }
-      };
-      try {
-        return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
-      } catch (IOException e) {
-        throw new ClientHandlerException("Jersey retry failed!\nMessage: "
-              + e.getMessage());
-      }
-    }
-  }
+  @Private
+  @VisibleForTesting
+  TimelineConnector connector;
 
   public TimelineClientImpl() {
-    super(TimelineClientImpl.class.getName(), null);
-  }
-
-  public TimelineClientImpl(ApplicationId applicationId) {
-    super(TimelineClientImpl.class.getName(), applicationId);
-    this.timelineServiceV2 = true;
+    super(TimelineClientImpl.class.getName());
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
-    this.configuration = conf;
+    timelineServiceVersion =
+        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+    LOG.info("Timeline service address: " + getTimelineServiceAddress());
+    if (!YarnConfiguration.timelineServiceEnabled(conf)
+        || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
+            || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+      throw new IOException("Timeline V1 client is not properly configured. "
+          + "Either timeline service is not enabled or version is not set to"
+          + " 1.x");
+    }
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation realUgi = ugi.getRealUser();
     if (realUgi != null) {
@@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
       authUgi = ugi;
       doAsUser = null;
     }
-    ClientConfig cc = new DefaultClientConfig();
-    cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
-    connConfigurator = initConnConfigurator(conf);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      authenticator = new KerberosDelegationTokenAuthenticator();
-    } else {
-      authenticator = new PseudoDelegationTokenAuthenticator();
-    }
-    authenticator.setConnectionConfigurator(connConfigurator);
     token = new DelegationTokenAuthenticatedURL.Token();
+    connector = createTimelineConnector();
 
-    connectionRetry = new TimelineClientConnectionRetry(conf);
-    client = new Client(new URLConnectionClientHandler(
-        new TimelineURLConnectionFactory()), cc);
-    TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
-    // TODO need to cleanup filter retry later.
-    if (!timelineServiceV2) {
-      client.addFilter(retryFilter);
-    }
-
-    // old version timeline service need to get address from configuration
-    // while new version need to auto discovery (with retry).
-    if (timelineServiceV2) {
-      maxServiceRetries = conf.getInt(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
-      serviceRetryInterval = conf.getLong(
-          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
-      entityDispatcher = new TimelineEntityDispatcher(conf);
+    if (YarnConfiguration.useHttps(conf)) {
+      timelineServiceAddress =
+          conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
     } else {
-      if (YarnConfiguration.useHttps(conf)) {
-        setTimelineServiceAddress(conf.get(
-            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
-      } else {
-        setTimelineServiceAddress(conf.get(
-            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
-      }
-      timelineServiceVersion =
-          conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
-      LOG.info("Timeline service address: " + getTimelineServiceAddress());
+      timelineServiceAddress =
+          conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
     }
     super.serviceInit(conf);
   }
 
+  @VisibleForTesting
+  protected TimelineConnector createTimelineConnector() {
+    TimelineConnector newConnector =
+        new TimelineConnector(true, authUgi, doAsUser, token);
+    addIfService(newConnector);
+    return newConnector;
+  }
+
   @Override
   protected void serviceStart() throws Exception {
-    if (timelineServiceV2) {
-      entityDispatcher.start();
-    } else {
-      timelineWriter = createTimelineWriter(configuration, authUgi, client,
-          constructResURI(getConfig(), timelineServiceAddress, false));
-    }
+    timelineWriter = createTimelineWriter(getConfig(), authUgi,
+        connector.getClient(), TimelineConnector.constructResURI(getConfig(),
+            timelineServiceAddress, RESOURCE_URI_STR_V1));
   }
 
   protected TimelineWriter createTimelineWriter(Configuration conf,
@@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
     if (this.timelineWriter != null) {
       this.timelineWriter.close();
     }
-    if (timelineServiceV2) {
-      entityDispatcher.stop();
-    }
-    if (this.sslFactory != null) {
-      this.sslFactory.destroy();
-    }
     super.serviceStop();
   }
 
@@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   @Override
-  public TimelinePutResponse putEntities(
-      TimelineEntity... entities) throws IOException, YarnException {
+  public TimelinePutResponse putEntities(TimelineEntity... entities)
+      throws IOException, YarnException {
     return timelineWriter.putEntities(entities);
   }
 
   @Override
-  public void putEntities(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException {
-    if (!timelineServiceV2) {
-      throw new YarnException("v.2 method is invoked on a v.1.x client");
-    }
-    entityDispatcher.dispatchEntities(true, entities);
-  }
-
-  @Override
-  public void putEntitiesAsync(
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
-          entities) throws IOException, YarnException {
-    if (!timelineServiceV2) {
-      throw new YarnException("v.2 method is invoked on a v.1.x client");
-    }
-    entityDispatcher.dispatchEntities(false, entities);
-  }
-
-  @Override
   public void putDomain(TimelineDomain domain) throws IOException,
       YarnException {
     timelineWriter.putDomain(domain);
   }
 
-  // Used for new timeline service only
-  @Private
-  protected void putObjects(String path, MultivaluedMap<String, String> params,
-      Object obj) throws IOException, YarnException {
-
-    int retries = verifyRestEndPointAvailable();
-
-    // timelineServiceAddress could be stale, add retry logic here.
-    boolean needRetry = true;
-    while (needRetry) {
-      try {
-        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
-        putObjects(uri, path, params, obj);
-        needRetry = false;
-      } catch (IOException e) {
-        // handle exception for timelineServiceAddress being updated.
-        checkRetryWithSleep(retries, e);
-        retries--;
-      }
-    }
-  }
-
-  private int verifyRestEndPointAvailable() throws YarnException {
-    // timelineServiceAddress could haven't be initialized yet
-    // or stale (only for new timeline service)
-    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
-    if (timelineServiceAddress == null) {
-      String errMessage = "TimelineClient has reached to max retry times : "
-          + this.maxServiceRetries
-          + ", but failed to fetch timeline service address. Please verify"
-          + " Timeline Auxiliary Service is configured in all the NMs";
-      LOG.error(errMessage);
-      throw new YarnException(errMessage);
-    }
-    return retries;
-  }
-
-  /**
-   * Check if reaching to maximum of retries.
-   * @param retries
-   * @param e
-   */
-  private void checkRetryWithSleep(int retries, IOException e)
-      throws YarnException, IOException {
-    if (retries > 0) {
-      try {
-        Thread.sleep(this.serviceRetryInterval);
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        throw new YarnException("Interrupted while retrying to connect to ATS");
-      }
-    } else {
-      StringBuilder msg =
-          new StringBuilder("TimelineClient has reached to max retry times : ");
-      msg.append(this.maxServiceRetries);
-      msg.append(" for service address: ");
-      msg.append(timelineServiceAddress);
-      LOG.error(msg.toString());
-      throw new IOException(msg.toString(), e);
-    }
-  }
-
-  protected void putObjects(
-      URI base, String path, MultivaluedMap<String, String> params, Object obj)
-          throws IOException, YarnException {
-    ClientResponse resp;
-    try {
-      resp = client.resource(base).path(path).queryParams(params)
-          .accept(MediaType.APPLICATION_JSON)
-          .type(MediaType.APPLICATION_JSON)
-          .put(ClientResponse.class, obj);
-    } catch (RuntimeException re) {
-      // runtime exception is expected if the client cannot connect the server
-      String msg =
-          "Failed to get the response from the timeline server.";
-      LOG.error(msg, re);
-      throw new IOException(re);
-    }
-    if (resp == null ||
-        resp.getStatusInfo().getStatusCode() !=
-            ClientResponse.Status.OK.getStatusCode()) {
-      String msg = "Response from the timeline server is " +
-          ((resp == null) ? "null":
-          "not successful," + " HTTP error code: " + resp.getStatus()
-          + ", Server response:\n" + resp.getEntity(String.class));
-      LOG.error(msg);
-      throw new YarnException(msg);
-    }
-  }
-
-  @Override
-  public void setTimelineServiceAddress(String address) {
-    this.timelineServiceAddress = address;
-  }
-
   private String getTimelineServiceAddress() {
     return this.timelineServiceAddress;
   }
@@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
           public Token<TimelineDelegationTokenIdentifier> run()
               throws Exception {
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // TODO we should add retry logic here if timelineServiceAddress is
             // not available immediately.
             return (Token) authUrl.getDelegationToken(
-                constructResURI(getConfig(),
-                    getTimelineServiceAddress(), false).toURL(),
+                TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
                 token, renewer, doAsUser);
           }
         };
-    return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
+    return (Token<TimelineDelegationTokenIdentifier>) connector
+        .operateDelegationToken(getDTAction);
   }
 
   @SuppressWarnings("unchecked")
@@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
               token.setDelegationToken((Token) timelineDT);
             }
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ?
-                constructResURI(getConfig(), getTimelineServiceAddress(), false)
+            final URI serviceURI = isTokenServiceAddrEmpty
+                ? TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR_V1, null, null);
+                    address.getPort(), RESOURCE_URI_STR_V1, null, null);
             return authUrl
                 .renewDelegationToken(serviceURI.toURL(), token, doAsUser);
           }
         };
-    return (Long) operateDelegationToken(renewDTAction);
+    return (Long) connector.operateDelegationToken(renewDTAction);
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public void cancelDelegationToken(
       final Token<TimelineDelegationTokenIdentifier> timelineDT)
-          throws IOException, YarnException {
+      throws IOException, YarnException {
     final boolean isTokenServiceAddrEmpty =
         timelineDT.getService().toString().isEmpty();
     final String scheme = isTokenServiceAddrEmpty ? null
@@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
               token.setDelegationToken((Token) timelineDT);
             }
             DelegationTokenAuthenticatedURL authUrl =
-                new DelegationTokenAuthenticatedURL(authenticator,
-                    connConfigurator);
+                connector.getDelegationTokenAuthenticatedURL();
             // If the token service address is not available, fall back to use
             // the configured service address.
-            final URI serviceURI = isTokenServiceAddrEmpty ?
-                constructResURI(getConfig(), getTimelineServiceAddress(), false)
+            final URI serviceURI = isTokenServiceAddrEmpty
+                ? TimelineConnector.constructResURI(getConfig(),
+                    getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR_V1, null, null);
+                    address.getPort(), RESOURCE_URI_STR_V1, null, null);
             authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
             return null;
           }
         };
-    operateDelegationToken(cancelDTAction);
+    connector.operateDelegationToken(cancelDTAction);
   }
 
   @Override
   public String toString() {
     return super.toString() + " with timeline server "
-        + constructResURI(getConfig(), getTimelineServiceAddress(), false)
+        + TimelineConnector.constructResURI(getConfig(),
+            getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
         + " and writer " + timelineWriter;
   }
 
-  private Object operateDelegationToken(
-      final PrivilegedExceptionAction<?> action)
-      throws IOException, YarnException {
-    // Set up the retry operation
-    TimelineClientRetryOp tokenRetryOp =
-        createTimelineClientRetryOpForOperateDelegationToken(action);
-
-    return connectionRetry.retryOn(tokenRetryOp);
-  }
-
-  /**
-   * Poll TimelineServiceAddress for maximum of retries times if it is null.
-   *
-   * @param retries
-   * @return the left retry times
-   * @throws IOException
-   */
-  private int pollTimelineServiceAddress(int retries) throws YarnException {
-    while (timelineServiceAddress == null && retries > 0) {
-      try {
-        Thread.sleep(this.serviceRetryInterval);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new YarnException("Interrupted while trying to connect ATS");
-      }
-      retries--;
-    }
-    return retries;
-  }
-
-  private class TimelineURLConnectionFactory
-      implements HttpURLConnectionFactory {
-
-    @Override
-    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
-      authUgi.checkTGTAndReloginFromKeytab();
-      try {
-        return new DelegationTokenAuthenticatedURL(
-            authenticator, connConfigurator).openConnection(url, token,
-              doAsUser);
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      } catch (AuthenticationException ae) {
-        throw new IOException(ae);
-      }
-    }
-
-  }
-
-  private ConnectionConfigurator initConnConfigurator(Configuration conf) {
-    try {
-      return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
-    } catch (Exception e) {
-      LOG.debug("Cannot load customized ssl related configuration. " +
-          "Fallback to system-generic settings.", e);
-      return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
-    }
-  }
-
-  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
-      new ConnectionConfigurator() {
-    @Override
-    public HttpURLConnection configure(HttpURLConnection conn)
-        throws IOException {
-      setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
-      return conn;
-    }
-  };
-
-  private ConnectionConfigurator initSslConnConfigurator(final int timeout,
-      Configuration conf) throws IOException, GeneralSecurityException {
-    final SSLSocketFactory sf;
-    final HostnameVerifier hv;
-
-    sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-    sslFactory.init();
-    sf = sslFactory.createSSLSocketFactory();
-    hv = sslFactory.getHostnameVerifier();
-
-    return new ConnectionConfigurator() {
-      @Override
-      public HttpURLConnection configure(HttpURLConnection conn)
-          throws IOException {
-        if (conn instanceof HttpsURLConnection) {
-          HttpsURLConnection c = (HttpsURLConnection) conn;
-          c.setSSLSocketFactory(sf);
-          c.setHostnameVerifier(hv);
-        }
-        setTimeouts(conn, timeout);
-        return conn;
-      }
-    };
-  }
-
-  private static void setTimeouts(URLConnection connection, int socketTimeout) {
-    connection.setConnectTimeout(socketTimeout);
-    connection.setReadTimeout(socketTimeout);
-  }
-
-  private static URI constructResURI(
-      Configuration conf, String address, boolean v2) {
-    return URI.create(
-        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
-            address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
-  }
-
   public static void main(String[] argv) throws Exception {
     CommandLine cliParser = new GnuParser().parse(opts, argv);
     if (cliParser.hasOption("put")) {
@@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
   public void setTimelineWriter(TimelineWriter writer) {
     this.timelineWriter = writer;
   }
-
-  @Private
-  @VisibleForTesting
-  public TimelineClientRetryOp
-      createTimelineClientRetryOpForOperateDelegationToken(
-          final PrivilegedExceptionAction<?> action) throws IOException {
-    return new TimelineClientRetryOpForOperateDelegationToken(
-        this.authUgi, action);
-  }
-
-  @Private
-  @VisibleForTesting
-  public class TimelineClientRetryOpForOperateDelegationToken
-      extends TimelineClientRetryOp {
-
-    private final UserGroupInformation authUgi;
-    private final PrivilegedExceptionAction<?> action;
-
-    public TimelineClientRetryOpForOperateDelegationToken(
-        UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
-      this.authUgi = authUgi;
-      this.action = action;
-    }
-
-    @Override
-    public Object run() throws IOException {
-      // Try pass the request, if fail, keep retrying
-      authUgi.checkTGTAndReloginFromKeytab();
-      try {
-        return authUgi.doAs(action);
-      } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public boolean shouldRetryOn(Exception e) {
-      // retry on connection exceptions
-      // and SocketTimeoutException
-      return (e instanceof ConnectException
-          || e instanceof SocketTimeoutException);
-    }
-  }
-
-  private final class EntitiesHolder extends FutureTask<Void> {
-    private final
-        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-            entities;
-    private final boolean isSync;
-
-    EntitiesHolder(
-        final
-            org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-                entities,
-        final boolean isSync) {
-      super(new Callable<Void>() {
-        // publishEntities()
-        public Void call() throws Exception {
-          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
-          params.add("appid", getContextAppId().toString());
-          params.add("async", Boolean.toString(!isSync));
-          putObjects("entities", params, entities);
-          return null;
-        }
-      });
-      this.entities = entities;
-      this.isSync = isSync;
-    }
-
-    public boolean isSync() {
-      return isSync;
-    }
-
-    public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-        getEntities() {
-      return entities;
-    }
-  }
-
-  /**
-   * This class is responsible for collecting the timeline entities and
-   * publishing them in async.
-   */
-  private class TimelineEntityDispatcher {
-    /**
-     * Time period for which the timelineclient will wait for draining after
-     * stop.
-     */
-    private static final long DRAIN_TIME_PERIOD = 2000L;
-
-    private int numberOfAsyncsToMerge;
-    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
-    private ExecutorService executor;
-
-    TimelineEntityDispatcher(Configuration conf) {
-      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
-      numberOfAsyncsToMerge =
-          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
-              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
-    }
-
-    Runnable createRunnable() {
-      return new Runnable() {
-        @Override
-        public void run() {
-          try {
-            EntitiesHolder entitiesHolder;
-            while (!Thread.currentThread().isInterrupted()) {
-              // Merge all the async calls and make one push, but if its sync
-              // call push immediately
-              try {
-                entitiesHolder = timelineEntityQueue.take();
-              } catch (InterruptedException ie) {
-                LOG.info("Timeline dispatcher thread was interrupted ");
-                Thread.currentThread().interrupt();
-                return;
-              }
-              if (entitiesHolder != null) {
-                publishWithoutBlockingOnQueue(entitiesHolder);
-              }
-            }
-          } finally {
-            if (!timelineEntityQueue.isEmpty()) {
-              LOG.info("Yet to publish " + timelineEntityQueue.size()
-                  + " timelineEntities, draining them now. ");
-            }
-            // Try to drain the remaining entities to be published @ the max for
-            // 2 seconds
-            long timeTillweDrain =
-                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
-            while (!timelineEntityQueue.isEmpty()) {
-              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
-              if (System.currentTimeMillis() > timeTillweDrain) {
-                // time elapsed stop publishing further....
-                if (!timelineEntityQueue.isEmpty()) {
-                  LOG.warn("Time to drain elapsed! Remaining "
-                      + timelineEntityQueue.size() + "timelineEntities will not"
-                      + " be published");
-                  // if some entities were not drained then we need interrupt
-                  // the threads which had put sync EntityHolders to the queue.
-                  EntitiesHolder nextEntityInTheQueue = null;
-                  while ((nextEntityInTheQueue =
-                      timelineEntityQueue.poll()) != null) {
-                    nextEntityInTheQueue.cancel(true);
-                  }
-                }
-                break;
-              }
-            }
-          }
-        }
-
-        /**
-         * Publishes the given EntitiesHolder and return immediately if sync
-         * call, else tries to fetch the EntitiesHolder from the queue in non
-         * blocking fashion and collate the Entities if possible before
-         * publishing through REST.
-         *
-         * @param entitiesHolder
-         */
-        private void publishWithoutBlockingOnQueue(
-            EntitiesHolder entitiesHolder) {
-          if (entitiesHolder.isSync()) {
-            entitiesHolder.run();
-            return;
-          }
-          int count = 1;
-          while (true) {
-            // loop till we find a sync put Entities or there is nothing
-            // to take
-            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
-            if (nextEntityInTheQueue == null) {
-              // Nothing in the queue just publish and get back to the
-              // blocked wait state
-              entitiesHolder.run();
-              break;
-            } else if (nextEntityInTheQueue.isSync()) {
-              // flush all the prev async entities first
-              entitiesHolder.run();
-              // and then flush the sync entity
-              nextEntityInTheQueue.run();
-              break;
-            } else {
-              // append all async entities together and then flush
-              entitiesHolder.getEntities().addEntities(
-                  nextEntityInTheQueue.getEntities().getEntities());
-              count++;
-              if (count == numberOfAsyncsToMerge) {
-                // Flush the entities if the number of the async
-                // putEntites merged reaches the desired limit. To avoid
-                // collecting multiple entities and delaying for a long
-                // time.
-                entitiesHolder.run();
-                break;
-              }
-            }
-          }
-        }
-      };
-    }
-
-    public void dispatchEntities(boolean sync,
-        org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
-            entitiesTobePublished) throws YarnException {
-      if (executor.isShutdown()) {
-        throw new YarnException("Timeline client is in the process of stopping,"
-            + " not accepting any more TimelineEntities");
-      }
-
-      // wrap all TimelineEntity into TimelineEntities object
-      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
-          entities =
-              new org.apache.hadoop.yarn.api.records.timelineservice.
-                  TimelineEntities();
-      for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
-               entity : entitiesTobePublished) {
-        entities.addEntity(entity);
-      }
-
-      // created a holder and place it in queue
-      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
-      try {
-        timelineEntityQueue.put(entitiesHolder);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new YarnException(
-            "Failed while adding entity to the queue for publishing", e);
-      }
-
-      if (sync) {
-        // In sync call we need to wait till its published and if any error then
-        // throw it back
-        try {
-          entitiesHolder.get();
-        } catch (ExecutionException e) {
-          throw new YarnException("Failed while publishing entity",
-              e.getCause());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new YarnException("Interrupted while publishing entity", e);
-        }
-      }
-    }
-
-    public void start() {
-      executor = Executors.newSingleThreadExecutor();
-      executor.execute(createRunnable());
-    }
-
-    public void stop() {
-      LOG.info("Stopping TimelineClient.");
-      executor.shutdownNow();
-      try {
-        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        e.printStackTrace();
-      }
-    }
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org