You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2017/02/16 19:41:27 UTC
[2/2] hadoop git commit: YARN-4675. Reorganize TimelineClient and
TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed
by Naganarasimha G R.
YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4fa1afdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4fa1afdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4fa1afdb
Branch: refs/heads/trunk
Commit: 4fa1afdb883dab8786d2fb5c72a195dd2e87d711
Parents: 5690b51
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Feb 16 11:41:04 2017 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Feb 16 11:41:04 2017 -0800
----------------------------------------------------------------------
.../jobhistory/JobHistoryEventHandler.java | 57 +-
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 14 +-
.../v2/app/rm/RMContainerAllocator.java | 4 +-
.../jobhistory/TestJobHistoryEventHandler.java | 8 +-
.../distributedshell/ApplicationMaster.java | 98 ++-
.../hadoop/yarn/client/api/AMRMClient.java | 40 +-
.../yarn/client/api/async/AMRMClientAsync.java | 21 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 5 +-
.../yarn/client/api/impl/YarnClientImpl.java | 15 +-
.../hadoop/yarn/client/api/TimelineClient.java | 94 +--
.../yarn/client/api/TimelineV2Client.java | 92 +++
.../client/api/impl/TimelineClientImpl.java | 825 ++-----------------
.../yarn/client/api/impl/TimelineConnector.java | 440 ++++++++++
.../client/api/impl/TimelineV2ClientImpl.java | 459 +++++++++++
.../client/api/impl/TestTimelineClient.java | 39 +-
.../api/impl/TestTimelineClientV2Impl.java | 4 +-
.../timelineservice/NMTimelinePublisher.java | 22 +-
.../TestNMTimelinePublisher.java | 10 +-
.../TestTimelineServiceClientIntegration.java | 10 +-
19 files changed, 1272 insertions(+), 985 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
index 0cc605c..285d36e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
@@ -72,13 +72,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
@@ -90,8 +89,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
*/
public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
- private static final JsonNodeFactory FACTORY =
- new ObjectMapper().getNodeFactory();
private final AppContext context;
private final int startCount;
@@ -133,9 +130,10 @@ public class JobHistoryEventHandler extends AbstractService
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
+ @VisibleForTesting
protected TimelineClient timelineClient;
-
- private boolean timelineServiceV2Enabled = false;
+ @VisibleForTesting
+ protected TimelineV2Client timelineV2Client;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
@@ -268,12 +266,17 @@ public class JobHistoryEventHandler extends AbstractService
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) {
-
- timelineClient =
- ((MRAppMaster.RunningAppContext)context).getTimelineClient();
- timelineClient.init(conf);
- timelineServiceV2Enabled =
- YarnConfiguration.timelineServiceV2Enabled(conf);
+ boolean timelineServiceV2Enabled =
+ ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+ if(timelineServiceV2Enabled) {
+ timelineV2Client =
+ ((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
+ timelineV2Client.init(conf);
+ } else {
+ timelineClient =
+ ((MRAppMaster.RunningAppContext) context).getTimelineClient();
+ timelineClient.init(conf);
+ }
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
} else {
@@ -324,6 +327,8 @@ public class JobHistoryEventHandler extends AbstractService
protected void serviceStart() throws Exception {
if (timelineClient != null) {
timelineClient.start();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.start();
}
eventHandlingThread = new Thread(new Runnable() {
@Override
@@ -448,6 +453,8 @@ public class JobHistoryEventHandler extends AbstractService
}
if (timelineClient != null) {
timelineClient.stop();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.stop();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
@@ -605,14 +612,12 @@ public class JobHistoryEventHandler extends AbstractService
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
- if (timelineClient != null) {
- if (timelineServiceV2Enabled) {
- processEventForNewTimelineService(historyEvent, event.getJobID(),
- event.getTimestamp());
- } else {
- processEventForTimelineServer(historyEvent, event.getJobID(),
- event.getTimestamp());
- }
+ if (timelineV2Client != null) {
+ processEventForNewTimelineService(historyEvent, event.getJobID(),
+ event.getTimestamp());
+ } else if (timelineClient != null) {
+ processEventForTimelineServer(historyEvent, event.getJobID(),
+ event.getTimestamp());
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
@@ -1162,8 +1167,8 @@ public class JobHistoryEventHandler extends AbstractService
configSize += size;
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (jobEntityForConfigs.getConfigs().size() > 0) {
- timelineClient.putEntities(jobEntityForConfigs);
- timelineClient.putEntities(appEntityForConfigs);
+ timelineV2Client.putEntities(jobEntityForConfigs);
+ timelineV2Client.putEntities(appEntityForConfigs);
jobEntityForConfigs = createJobEntity(jobId);
appEntityForConfigs = new ApplicationEntity();
appEntityForConfigs.setId(appId);
@@ -1174,8 +1179,8 @@ public class JobHistoryEventHandler extends AbstractService
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
- timelineClient.putEntities(jobEntityForConfigs);
- timelineClient.putEntities(appEntityForConfigs);
+ timelineV2Client.putEntities(jobEntityForConfigs);
+ timelineV2Client.putEntities(appEntityForConfigs);
}
} catch (IOException | YarnException e) {
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
@@ -1295,9 +1300,9 @@ public class JobHistoryEventHandler extends AbstractService
}
try {
if (appEntityWithJobMetrics == null) {
- timelineClient.putEntitiesAsync(tEntity);
+ timelineV2Client.putEntitiesAsync(tEntity);
} else {
- timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
+ timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 835c0aa..12df83d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.crypto.KeyGenerator;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -141,6 +143,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -154,8 +157,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
-import javax.crypto.KeyGenerator;
-
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@@ -1066,6 +1067,7 @@ public class MRAppMaster extends CompositeService {
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null;
+ private TimelineV2Client timelineV2Client = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@@ -1081,7 +1083,7 @@ public class MRAppMaster extends CompositeService {
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// create new version TimelineClient
- timelineClient = TimelineClient.createTimelineClient(
+ timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
} else {
timelineClient = TimelineClient.createTimelineClient();
@@ -1177,10 +1179,14 @@ public class MRAppMaster extends CompositeService {
return taskAttemptFinishingMonitor;
}
- // Get Timeline Collector's address (get sync from RM)
public TimelineClient getTimelineClient() {
return timelineClient;
}
+
+ // Get Timeline Collector's address (get sync from RM)
+ public TimelineV2Client getTimelineV2Client() {
+ return timelineV2Client;
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 31bc380..1f88a2c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -882,8 +882,8 @@ public class RMContainerAllocator extends RMContainerRequestor
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (collectorAddr != null && !collectorAddr.isEmpty()
- && appContext.getTimelineClient() != null) {
- appContext.getTimelineClient().setTimelineServiceAddress(
+ && appContext.getTimelineV2Client() != null) {
+ appContext.getTimelineV2Client().setTimelineServiceAddress(
response.getCollectorAddr());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
index 0b33d6b..6c5e604 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
@@ -29,8 +29,8 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import org.apache.commons.logging.Log;
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -829,6 +830,9 @@ public class TestJobHistoryEventHandler {
if (mockContext instanceof RunningAppContext) {
when(((RunningAppContext)mockContext).getTimelineClient()).
thenReturn(TimelineClient.createTimelineClient());
+ when(((RunningAppContext) mockContext).getTimelineV2Client())
+ .thenReturn(TimelineV2Client
+ .createTimelineClient(ApplicationId.newInstance(0, 1)));
}
return mockContext;
}
@@ -937,6 +941,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
protected void serviceStart() {
if (timelineClient != null) {
timelineClient.start();
+ } else if (timelineV2Client != null) {
+ timelineV2Client.start();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 5a06ef6..4daebb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -219,7 +220,9 @@ public class ApplicationMaster {
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
- private boolean timelineServiceV2 = false;
+ private boolean timelineServiceV2Enabled = false;
+
+ private boolean timelineServiceV1Enabled = false;
// App Master configuration
// No. of containers to run shell command on
@@ -293,6 +296,10 @@ public class ApplicationMaster {
// Timeline Client
@VisibleForTesting
TimelineClient timelineClient;
+
+ // Timeline v2 Client
+ private TimelineV2Client timelineV2Client;
+
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId";
static final String USER_TIMELINE_FILTER_NAME = "user";
@@ -556,9 +563,12 @@ public class ApplicationMaster {
"container_retry_interval", "0"));
if (YarnConfiguration.timelineServiceEnabled(conf)) {
- timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
+ timelineServiceV2Enabled =
+ ((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
+ timelineServiceV1Enabled = !timelineServiceV2Enabled;
} else {
timelineClient = null;
+ timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
@@ -621,18 +631,17 @@ public class ApplicationMaster {
nmClientAsync.start();
startTimelineClient(conf);
- if (timelineServiceV2) {
+ if (timelineServiceV2Enabled) {
// need to bind timelineClient
- amRMClient.registerTimelineClient(timelineClient);
+ amRMClient.registerTimelineV2Client(timelineV2Client);
}
- if(timelineClient != null) {
- if (timelineServiceV2) {
- publishApplicationAttemptEventOnTimelineServiceV2(
- DSEvent.DS_APP_ATTEMPT_START);
- } else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
- }
+
+ if (timelineServiceV2Enabled) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_START);
+ } else if (timelineServiceV1Enabled) {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
}
// Setup local RPC Server to accept status requests directly from clients
@@ -704,18 +713,21 @@ public class ApplicationMaster {
public Void run() throws Exception {
if (YarnConfiguration.timelineServiceEnabled(conf)) {
// Creating the Timeline Client
- if (timelineServiceV2) {
- timelineClient = TimelineClient.createTimelineClient(
+ if (timelineServiceV2Enabled) {
+ timelineV2Client = TimelineV2Client.createTimelineClient(
appAttemptID.getApplicationId());
+ timelineV2Client.init(conf);
+ timelineV2Client.start();
LOG.info("Timeline service V2 client is enabled");
} else {
timelineClient = TimelineClient.createTimelineClient();
+ timelineClient.init(conf);
+ timelineClient.start();
LOG.info("Timeline service V1 client is enabled");
}
- timelineClient.init(conf);
- timelineClient.start();
} else {
timelineClient = null;
+ timelineV2Client = null;
LOG.warn("Timeline service is not enabled");
}
return null;
@@ -741,14 +753,12 @@ public class ApplicationMaster {
} catch (InterruptedException ex) {}
}
- if (timelineClient != null) {
- if (timelineServiceV2) {
- publishApplicationAttemptEventOnTimelineServiceV2(
- DSEvent.DS_APP_ATTEMPT_END);
- } else {
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
- }
+ if (timelineServiceV2Enabled) {
+ publishApplicationAttemptEventOnTimelineServiceV2(
+ DSEvent.DS_APP_ATTEMPT_END);
+ } else if (timelineServiceV1Enabled) {
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+ DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
}
// Join all launched threads
@@ -797,8 +807,10 @@ public class ApplicationMaster {
amRMClient.stop();
// Stop Timeline Client
- if(timelineClient != null) {
+ if(timelineServiceV1Enabled) {
timelineClient.stop();
+ } else if (timelineServiceV2Enabled) {
+ timelineV2Client.stop();
}
return success;
@@ -853,16 +865,14 @@ public class ApplicationMaster {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
- if(timelineClient != null) {
- if (timelineServiceV2) {
- publishContainerEndEventOnTimelineServiceV2(containerStatus);
- } else {
- publishContainerEndEvent(
- timelineClient, containerStatus, domainId, appSubmitterUgi);
- }
+ if (timelineServiceV2Enabled) {
+ publishContainerEndEventOnTimelineServiceV2(containerStatus);
+ } else if (timelineServiceV1Enabled) {
+ publishContainerEndEvent(timelineClient, containerStatus, domainId,
+ appSubmitterUgi);
}
}
-
+
// ask for more containers if any failed
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
@@ -983,15 +993,13 @@ public class ApplicationMaster {
applicationMaster.nmClientAsync.getContainerStatusAsync(
containerId, container.getNodeId());
}
- if(applicationMaster.timelineClient != null) {
- if (applicationMaster.timelineServiceV2) {
- applicationMaster.publishContainerStartEventOnTimelineServiceV2(
- container);
- } else {
- applicationMaster.publishContainerStartEvent(
- applicationMaster.timelineClient, container,
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
- }
+ if (applicationMaster.timelineServiceV2Enabled) {
+ applicationMaster
+ .publishContainerStartEventOnTimelineServiceV2(container);
+ } else if (applicationMaster.timelineServiceV1Enabled) {
+ applicationMaster.publishContainerStartEvent(
+ applicationMaster.timelineClient, container,
+ applicationMaster.domainId, applicationMaster.appSubmitterUgi);
}
}
@@ -1371,7 +1379,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntities(entity);
+ timelineV2Client.putEntities(entity);
return null;
}
});
@@ -1404,7 +1412,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntities(entity);
+ timelineV2Client.putEntities(entity);
return null;
}
});
@@ -1438,7 +1446,7 @@ public class ApplicationMaster {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public TimelinePutResponse run() throws Exception {
- timelineClient.putEntitiesAsync(entity);
+ timelineV2Client.putEntitiesAsync(entity);
return null;
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 15d0065..69f3777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -41,12 +42,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.util.resource.Resources;
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
- private TimelineClient timelineClient;
+ private TimelineV2Client timelineV2Client;
+ private boolean timelineServiceV2Enabled;
/**
* Create a new instance of AMRMClient.
@@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
nmTokenCache = NMTokenCache.getSingleton();
}
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
+ }
+
/**
* Object to represent a single container request for resources. Scheduler
* documentation should be consulted for the specifics of how the parameters
@@ -682,19 +691,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
}
/**
- * Register TimelineClient to AMRMClient.
- * @param client the timeline client to register
+ * Register TimelineV2Client to AMRMClient. Writer's address for the timeline
+ * V2 client will be updated dynamically if registered.
+ *
+ * @param client the timeline v2 client to register
+ * @throws YarnException when this method is invoked even when ATS V2 is not
+ * configured.
*/
- public void registerTimelineClient(TimelineClient client) {
- this.timelineClient = client;
+ public void registerTimelineV2Client(TimelineV2Client client)
+ throws YarnException {
+ if (timelineServiceV2Enabled) {
+ timelineV2Client = client;
+ } else {
+ LOG.error("Trying to register timeline v2 client when not configured.");
+ throw new YarnException(
+ "register timeline v2 client when not configured.");
+ }
}
/**
- * Get registered timeline client.
- * @return the registered timeline client
+ * Get registered timeline v2 client.
+ * @return the registered timeline v2 client
*/
- public TimelineClient getRegisteredTimelineClient() {
- return this.timelineClient;
+ public TimelineV2Client getRegisteredTimelineV2Client() {
+ return this.timelineV2Client;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 4cb27cd..1ecfe1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.client.api.async;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
@@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
/**
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
@@ -346,17 +346,20 @@ extends AbstractService {
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
+ * @throws YarnException when this method is invoked even when ATS V2 is not
+ * configured.
*/
- public void registerTimelineClient(TimelineClient timelineClient) {
- client.registerTimelineClient(timelineClient);
+ public void registerTimelineV2Client(TimelineV2Client timelineClient)
+ throws YarnException {
+ client.registerTimelineV2Client(timelineClient);
}
/**
* Get registered timeline client.
* @return the registered timeline client
*/
- public TimelineClient getRegisteredTimelineClient() {
- return client.getRegisteredTimelineClient();
+ public TimelineV2Client getRegisteredTimelineV2Client() {
+ return client.getRegisteredTimelineV2Client();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 9e2c0e5..6711da2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
@@ -326,7 +326,8 @@ extends AMRMClientAsync<T> {
AllocateResponse response = (AllocateResponse) object;
String collectorAddress = response.getCollectorAddr();
- TimelineClient timelineClient = client.getRegisteredTimelineClient();
+ TimelineV2Client timelineClient =
+ client.getRegisteredTimelineV2Client();
if (timelineClient != null && collectorAddress != null
&& !collectorAddress.isEmpty()) {
if (collectorAddr == null
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index e406862..4a27fee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
Text timelineService;
@VisibleForTesting
String timelineDTRenewer;
- protected boolean timelineServiceEnabled;
+ private boolean timelineV1ServiceEnabled;
protected boolean timelineServiceBestEffort;
private static final String ROOT = "root";
@@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
+ float timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
- timelineServiceEnabled = true;
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+ && ((Float.compare(timelineServiceVersion, 1.0f) == 0)
+ || (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
+ timelineV1ServiceEnabled = true;
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
timelineService = TimelineUtils.buildTimelineTokenService(conf);
}
@@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
// TimelineServer which means we are able to get history information
// for applications/applicationAttempts/containers by using ahsClient
// when the TimelineServer is running.
- if (timelineServiceEnabled || conf.getBoolean(
+ if (timelineV1ServiceEnabled || conf.getBoolean(
YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
historyServiceEnabled = true;
@@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
- if (isSecurityEnabled() && timelineServiceEnabled) {
+ if (isSecurityEnabled() && timelineV1ServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index cc76718..4835239 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
import java.io.Flushable;
import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
@@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* A client library that can be used to post some information in terms of a
- * number of conceptual entities.
+ * number of conceptual entities. This client library needs to be used along
+ * with Timeline V.1.x server versions.
+ * Refer {@link TimelineV2Client} for ATS V2 interface.
*/
@Public
@Evolving
-public abstract class TimelineClient extends AbstractService implements
+public abstract class TimelineClient extends CompositeService implements
Flushable {
/**
- * Create a timeline client. The current UGI when the user initialize the
- * client will be used to do the put and the delegation token operations. The
- * current user may use {@link UserGroupInformation#doAs} another user to
- * construct and initialize a timeline client if the following operations are
- * supposed to be conducted by that user.
- */
- private ApplicationId contextAppId;
-
- /**
* Creates an instance of the timeline v.1.x client.
+ * The current UGI when the user initialize the client will be used to do the
+ * put and the delegation token operations. The current user may use
+ * {@link UserGroupInformation#doAs} another user to construct and initialize
+ * a timeline client if the following operations are supposed to be conducted
+ * by that user.
*
* @return the created timeline client instance
*/
@@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
return client;
}
- /**
- * Creates an instance of the timeline v.2 client.
- *
- * @param appId the application id with which the timeline client is
- * associated
- * @return the created timeline client instance
- */
- @Public
- public static TimelineClient createTimelineClient(ApplicationId appId) {
- TimelineClient client = new TimelineClientImpl(appId);
- return client;
- }
-
- @Private
- protected TimelineClient(String name, ApplicationId appId) {
+ protected TimelineClient(String name) {
super(name);
- setContextAppId(appId);
}
/**
@@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
public abstract void cancelDelegationToken(
Token<TimelineDelegationTokenIdentifier> timelineDT)
throws IOException, YarnException;
-
- /**
- * <p>
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is a blocking API. The method will not return
- * until all the put entities have been persisted. If this method is invoked
- * for a non-v.2 timeline client instance, a YarnException is thrown.
- * </p>
- *
- * @param entities the collection of {@link
- * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract void putEntities(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException;
-
- /**
- * <p>
- * Send the information of a number of conceptual entities to the timeline
- * service v.2 collector. It is an asynchronous API. The method will return
- * once all the entities are received. If this method is invoked for a
- * non-v.2 timeline client instance, a YarnException is thrown.
- * </p>
- *
- * @param entities the collection of {@link
- * org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract void putEntitiesAsync(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException;
-
- /**
- * <p>
- * Update the timeline service address where the request will be sent to.
- * </p>
- * @param address
- * the timeline service address
- */
- public abstract void setTimelineServiceAddress(String address);
-
- protected ApplicationId getContextAppId() {
- return contextAppId;
- }
-
- protected void setContextAppId(ApplicationId appId) {
- this.contextAppId = appId;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
new file mode 100644
index 0000000..32cf1e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineV2Client.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities. This client library needs to be used along
+ * with time line v.2 server version.
+ * Refer {@link TimelineClient} for ATS V1 interface.
+ */
+public abstract class TimelineV2Client extends CompositeService {
+ /**
+ * Creates an instance of the timeline v.2 client.
+ *
+ * @param appId the application id with which the timeline client is
+ * associated
+ * @return the created timeline client instance
+ */
+ @Public
+ public static TimelineV2Client createTimelineClient(ApplicationId appId) {
+ TimelineV2Client client = new TimelineV2ClientImpl(appId);
+ return client;
+ }
+
+ protected TimelineV2Client(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * service v.2 collector. It is a blocking API. The method will not return
+ * until all the put entities have been persisted.
+ * </p>
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putEntities(TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Send the information of a number of conceptual entities to the timeline
+ * service v.2 collector. It is an asynchronous API. The method will return
+ * once all the entities are received.
+ * </p>
+ *
+ * @param entities the collection of {@link TimelineEntity}
+ * @throws IOException if there are I/O errors
+ * @throws YarnException if entities are incomplete/invalid
+ */
+ @Public
+ public abstract void putEntitiesAsync(TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ * <p>
+ * Update the timeline service address where the request will be sent to.
+ * </p>
+ *
+ * @param address the timeline service address
+ */
+ public abstract void setTimelineServiceAddress(String address);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4fa1afdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 4506c48..f49618b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -20,32 +20,10 @@ package org.apache.hadoop.yarn.client.api.impl;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.net.URI;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
-import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -79,19 +50,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientRequest;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.client.filter.ClientFilter;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
@Private
@Evolving
@@ -100,9 +61,6 @@ public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
- private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
- private static final Joiner JOINER = Joiner.on("");
- public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
private static Options opts;
private static final String ENTITY_DATA_TYPE = "entity";
@@ -117,179 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
opts.addOption("help", false, "Print usage");
}
- private Client client;
- private ConnectionConfigurator connConfigurator;
- private DelegationTokenAuthenticator authenticator;
- private DelegationTokenAuthenticatedURL.Token token;
- private UserGroupInformation authUgi;
- private String doAsUser;
- private Configuration configuration;
- private float timelineServiceVersion;
- private TimelineWriter timelineWriter;
- private SSLFactory sslFactory;
-
- private volatile String timelineServiceAddress;
-
- // Retry parameters for identifying new timeline service
- // TODO consider to merge with connection retry
- private int maxServiceRetries;
- private long serviceRetryInterval;
- private boolean timelineServiceV2 = false;
-
- @Private
@VisibleForTesting
- TimelineClientConnectionRetry connectionRetry;
-
- private TimelineEntityDispatcher entityDispatcher;
-
- // Abstract class for an operation that should be retried by timeline client
- @Private
+ protected DelegationTokenAuthenticatedURL.Token token;
@VisibleForTesting
- public static abstract class TimelineClientRetryOp {
- // The operation that should be retried
- public abstract Object run() throws IOException;
- // The method to indicate if we should retry given the incoming exception
- public abstract boolean shouldRetryOn(Exception e);
- }
-
- // Class to handle retry
- // Outside this class, only visible to tests
- @Private
+ protected UserGroupInformation authUgi;
@VisibleForTesting
- static class TimelineClientConnectionRetry {
-
- // maxRetries < 0 means keep trying
- @Private
- @VisibleForTesting
- public int maxRetries;
-
- @Private
- @VisibleForTesting
- public long retryInterval;
-
- // Indicates if retries happened last time. Only tests should read it.
- // In unit tests, retryOn() calls should _not_ be concurrent.
- private boolean retried = false;
+ protected String doAsUser;
- @Private
- @VisibleForTesting
- boolean getRetired() {
- return retried;
- }
-
- // Constructor with default retry settings
- public TimelineClientConnectionRetry(Configuration conf) {
- Preconditions.checkArgument(conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
- "%s property value should be greater than or equal to -1",
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- Preconditions
- .checkArgument(
- conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
- "%s property value should be greater than zero",
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- maxRetries = conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- retryInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- }
-
- public Object retryOn(TimelineClientRetryOp op)
- throws RuntimeException, IOException {
- int leftRetries = maxRetries;
- retried = false;
-
- // keep trying
- while (true) {
- try {
- // try perform the op, if fail, keep retrying
- return op.run();
- } catch (IOException | RuntimeException e) {
- // break if there's no retries left
- if (leftRetries == 0) {
- break;
- }
- if (op.shouldRetryOn(e)) {
- logException(e, leftRetries);
- } else {
- throw e;
- }
- }
- if (leftRetries > 0) {
- leftRetries--;
- }
- retried = true;
- try {
- // sleep for the given time interval
- Thread.sleep(retryInterval);
- } catch (InterruptedException ie) {
- LOG.warn("Client retry sleep interrupted! ");
- }
- }
- throw new RuntimeException("Failed to connect to timeline server. "
- + "Connection retries limit exceeded. "
- + "The posted timeline event may be missing");
- };
-
- private void logException(Exception e, int leftRetries) {
- if (leftRetries > 0) {
- LOG.info("Exception caught by TimelineClientConnectionRetry,"
- + " will try " + leftRetries + " more time(s).\nMessage: "
- + e.getMessage());
- } else {
- // note that maxRetries may be -1 at the very beginning
- LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
- + " will keep retrying.\nMessage: "
- + e.getMessage());
- }
- }
- }
+ private float timelineServiceVersion;
+ private TimelineWriter timelineWriter;
- private class TimelineJerseyRetryFilter extends ClientFilter {
- @Override
- public ClientResponse handle(final ClientRequest cr)
- throws ClientHandlerException {
- // Set up the retry operation
- TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
- @Override
- public Object run() {
- // Try pass the request, if fail, keep retrying
- return getNext().handle(cr);
- }
+ private String timelineServiceAddress;
- @Override
- public boolean shouldRetryOn(Exception e) {
- // Only retry on connection exceptions
- return (e instanceof ClientHandlerException)
- && (e.getCause() instanceof ConnectException ||
- e.getCause() instanceof SocketTimeoutException);
- }
- };
- try {
- return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
- } catch (IOException e) {
- throw new ClientHandlerException("Jersey retry failed!\nMessage: "
- + e.getMessage());
- }
- }
- }
+ @Private
+ @VisibleForTesting
+ TimelineConnector connector;
public TimelineClientImpl() {
- super(TimelineClientImpl.class.getName(), null);
- }
-
- public TimelineClientImpl(ApplicationId applicationId) {
- super(TimelineClientImpl.class.getName(), applicationId);
- this.timelineServiceV2 = true;
+ super(TimelineClientImpl.class.getName());
}
protected void serviceInit(Configuration conf) throws Exception {
- this.configuration = conf;
+ timelineServiceVersion =
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+ LOG.info("Timeline service address: " + getTimelineServiceAddress());
+ if (!YarnConfiguration.timelineServiceEnabled(conf)
+ || !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
+ || (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
+ throw new IOException("Timeline V1 client is not properly configured. "
+ + "Either timeline service is not enabled or version is not set to"
+ + " 1.x");
+ }
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
if (realUgi != null) {
@@ -299,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
authUgi = ugi;
doAsUser = null;
}
- ClientConfig cc = new DefaultClientConfig();
- cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
- connConfigurator = initConnConfigurator(conf);
- if (UserGroupInformation.isSecurityEnabled()) {
- authenticator = new KerberosDelegationTokenAuthenticator();
- } else {
- authenticator = new PseudoDelegationTokenAuthenticator();
- }
- authenticator.setConnectionConfigurator(connConfigurator);
token = new DelegationTokenAuthenticatedURL.Token();
+ connector = createTimelineConnector();
- connectionRetry = new TimelineClientConnectionRetry(conf);
- client = new Client(new URLConnectionClientHandler(
- new TimelineURLConnectionFactory()), cc);
- TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
- // TODO need to cleanup filter retry later.
- if (!timelineServiceV2) {
- client.addFilter(retryFilter);
- }
-
- // old version timeline service need to get address from configuration
- // while new version need to auto discovery (with retry).
- if (timelineServiceV2) {
- maxServiceRetries = conf.getInt(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
- serviceRetryInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
- entityDispatcher = new TimelineEntityDispatcher(conf);
+ if (YarnConfiguration.useHttps(conf)) {
+ timelineServiceAddress =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
} else {
- if (YarnConfiguration.useHttps(conf)) {
- setTimelineServiceAddress(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
- } else {
- setTimelineServiceAddress(conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
- }
- timelineServiceVersion =
- conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
- LOG.info("Timeline service address: " + getTimelineServiceAddress());
+ timelineServiceAddress =
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
super.serviceInit(conf);
}
+ @VisibleForTesting
+ protected TimelineConnector createTimelineConnector() {
+ TimelineConnector newConnector =
+ new TimelineConnector(true, authUgi, doAsUser, token);
+ addIfService(newConnector);
+ return newConnector;
+ }
+
@Override
protected void serviceStart() throws Exception {
- if (timelineServiceV2) {
- entityDispatcher.start();
- } else {
- timelineWriter = createTimelineWriter(configuration, authUgi, client,
- constructResURI(getConfig(), timelineServiceAddress, false));
- }
+ timelineWriter = createTimelineWriter(getConfig(), authUgi,
+ connector.getClient(), TimelineConnector.constructResURI(getConfig(),
+ timelineServiceAddress, RESOURCE_URI_STR_V1));
}
protected TimelineWriter createTimelineWriter(Configuration conf,
@@ -373,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
if (this.timelineWriter != null) {
this.timelineWriter.close();
}
- if (timelineServiceV2) {
- entityDispatcher.stop();
- }
- if (this.sslFactory != null) {
- this.sslFactory.destroy();
- }
super.serviceStop();
}
@@ -390,132 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
}
@Override
- public TimelinePutResponse putEntities(
- TimelineEntity... entities) throws IOException, YarnException {
+ public TimelinePutResponse putEntities(TimelineEntity... entities)
+ throws IOException, YarnException {
return timelineWriter.putEntities(entities);
}
@Override
- public void putEntities(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException {
- if (!timelineServiceV2) {
- throw new YarnException("v.2 method is invoked on a v.1.x client");
- }
- entityDispatcher.dispatchEntities(true, entities);
- }
-
- @Override
- public void putEntitiesAsync(
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
- entities) throws IOException, YarnException {
- if (!timelineServiceV2) {
- throw new YarnException("v.2 method is invoked on a v.1.x client");
- }
- entityDispatcher.dispatchEntities(false, entities);
- }
-
- @Override
public void putDomain(TimelineDomain domain) throws IOException,
YarnException {
timelineWriter.putDomain(domain);
}
- // Used for new timeline service only
- @Private
- protected void putObjects(String path, MultivaluedMap<String, String> params,
- Object obj) throws IOException, YarnException {
-
- int retries = verifyRestEndPointAvailable();
-
- // timelineServiceAddress could be stale, add retry logic here.
- boolean needRetry = true;
- while (needRetry) {
- try {
- URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
- putObjects(uri, path, params, obj);
- needRetry = false;
- } catch (IOException e) {
- // handle exception for timelineServiceAddress being updated.
- checkRetryWithSleep(retries, e);
- retries--;
- }
- }
- }
-
- private int verifyRestEndPointAvailable() throws YarnException {
- // timelineServiceAddress could haven't be initialized yet
- // or stale (only for new timeline service)
- int retries = pollTimelineServiceAddress(this.maxServiceRetries);
- if (timelineServiceAddress == null) {
- String errMessage = "TimelineClient has reached to max retry times : "
- + this.maxServiceRetries
- + ", but failed to fetch timeline service address. Please verify"
- + " Timeline Auxiliary Service is configured in all the NMs";
- LOG.error(errMessage);
- throw new YarnException(errMessage);
- }
- return retries;
- }
-
- /**
- * Check if reaching to maximum of retries.
- * @param retries
- * @param e
- */
- private void checkRetryWithSleep(int retries, IOException e)
- throws YarnException, IOException {
- if (retries > 0) {
- try {
- Thread.sleep(this.serviceRetryInterval);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while retrying to connect to ATS");
- }
- } else {
- StringBuilder msg =
- new StringBuilder("TimelineClient has reached to max retry times : ");
- msg.append(this.maxServiceRetries);
- msg.append(" for service address: ");
- msg.append(timelineServiceAddress);
- LOG.error(msg.toString());
- throw new IOException(msg.toString(), e);
- }
- }
-
- protected void putObjects(
- URI base, String path, MultivaluedMap<String, String> params, Object obj)
- throws IOException, YarnException {
- ClientResponse resp;
- try {
- resp = client.resource(base).path(path).queryParams(params)
- .accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .put(ClientResponse.class, obj);
- } catch (RuntimeException re) {
- // runtime exception is expected if the client cannot connect the server
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg, re);
- throw new IOException(re);
- }
- if (resp == null ||
- resp.getStatusInfo().getStatusCode() !=
- ClientResponse.Status.OK.getStatusCode()) {
- String msg = "Response from the timeline server is " +
- ((resp == null) ? "null":
- "not successful," + " HTTP error code: " + resp.getStatus()
- + ", Server response:\n" + resp.getEntity(String.class));
- LOG.error(msg);
- throw new YarnException(msg);
- }
- }
-
- @Override
- public void setTimelineServiceAddress(String address) {
- this.timelineServiceAddress = address;
- }
-
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@@ -532,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
public Token<TimelineDelegationTokenIdentifier> run()
throws Exception {
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// TODO we should add retry logic here if timelineServiceAddress is
// not available immediately.
return (Token) authUrl.getDelegationToken(
- constructResURI(getConfig(),
- getTimelineServiceAddress(), false).toURL(),
+ TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
token, renewer, doAsUser);
}
};
- return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
+ return (Token<TimelineDelegationTokenIdentifier>) connector
+ .operateDelegationToken(getDTAction);
}
@SuppressWarnings("unchecked")
@@ -568,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
- final URI serviceURI = isTokenServiceAddrEmpty ?
- constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ final URI serviceURI = isTokenServiceAddrEmpty
+ ? TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR_V1, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
return authUrl
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
}
};
- return (Long) operateDelegationToken(renewDTAction);
+ return (Long) connector.operateDelegationToken(renewDTAction);
}
@SuppressWarnings("unchecked")
@Override
public void cancelDelegationToken(
final Token<TimelineDelegationTokenIdentifier> timelineDT)
- throws IOException, YarnException {
+ throws IOException, YarnException {
final boolean isTokenServiceAddrEmpty =
timelineDT.getService().toString().isEmpty();
final String scheme = isTokenServiceAddrEmpty ? null
@@ -607,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
token.setDelegationToken((Token) timelineDT);
}
DelegationTokenAuthenticatedURL authUrl =
- new DelegationTokenAuthenticatedURL(authenticator,
- connConfigurator);
+ connector.getDelegationTokenAuthenticatedURL();
// If the token service address is not available, fall back to use
// the configured service address.
- final URI serviceURI = isTokenServiceAddrEmpty ?
- constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ final URI serviceURI = isTokenServiceAddrEmpty
+ ? TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
: new URI(scheme, null, address.getHostName(),
- address.getPort(), RESOURCE_URI_STR_V1, null, null);
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
return null;
}
};
- operateDelegationToken(cancelDTAction);
+ connector.operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server "
- + constructResURI(getConfig(), getTimelineServiceAddress(), false)
+ + TimelineConnector.constructResURI(getConfig(),
+ getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
+ " and writer " + timelineWriter;
}
- private Object operateDelegationToken(
- final PrivilegedExceptionAction<?> action)
- throws IOException, YarnException {
- // Set up the retry operation
- TimelineClientRetryOp tokenRetryOp =
- createTimelineClientRetryOpForOperateDelegationToken(action);
-
- return connectionRetry.retryOn(tokenRetryOp);
- }
-
- /**
- * Poll TimelineServiceAddress for maximum of retries times if it is null.
- *
- * @param retries
- * @return the left retry times
- * @throws IOException
- */
- private int pollTimelineServiceAddress(int retries) throws YarnException {
- while (timelineServiceAddress == null && retries > 0) {
- try {
- Thread.sleep(this.serviceRetryInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while trying to connect ATS");
- }
- retries--;
- }
- return retries;
- }
-
- private class TimelineURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
- authUgi.checkTGTAndReloginFromKeytab();
- try {
- return new DelegationTokenAuthenticatedURL(
- authenticator, connConfigurator).openConnection(url, token,
- doAsUser);
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (AuthenticationException ae) {
- throw new IOException(ae);
- }
- }
-
- }
-
- private ConnectionConfigurator initConnConfigurator(Configuration conf) {
- try {
- return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
- } catch (Exception e) {
- LOG.debug("Cannot load customized ssl related configuration. " +
- "Fallback to system-generic settings.", e);
- return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
- }
- }
-
- private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
- new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
- return conn;
- }
- };
-
- private ConnectionConfigurator initSslConnConfigurator(final int timeout,
- Configuration conf) throws IOException, GeneralSecurityException {
- final SSLSocketFactory sf;
- final HostnameVerifier hv;
-
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- sslFactory.init();
- sf = sslFactory.createSSLSocketFactory();
- hv = sslFactory.getHostnameVerifier();
-
- return new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection c = (HttpsURLConnection) conn;
- c.setSSLSocketFactory(sf);
- c.setHostnameVerifier(hv);
- }
- setTimeouts(conn, timeout);
- return conn;
- }
- };
- }
-
- private static void setTimeouts(URLConnection connection, int socketTimeout) {
- connection.setConnectTimeout(socketTimeout);
- connection.setReadTimeout(socketTimeout);
- }
-
- private static URI constructResURI(
- Configuration conf, String address, boolean v2) {
- return URI.create(
- JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
- address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
- }
-
public static void main(String[] argv) throws Exception {
CommandLine cliParser = new GnuParser().parse(opts, argv);
if (cliParser.hasOption("put")) {
@@ -870,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
public void setTimelineWriter(TimelineWriter writer) {
this.timelineWriter = writer;
}
-
- @Private
- @VisibleForTesting
- public TimelineClientRetryOp
- createTimelineClientRetryOpForOperateDelegationToken(
- final PrivilegedExceptionAction<?> action) throws IOException {
- return new TimelineClientRetryOpForOperateDelegationToken(
- this.authUgi, action);
- }
-
- @Private
- @VisibleForTesting
- public class TimelineClientRetryOpForOperateDelegationToken
- extends TimelineClientRetryOp {
-
- private final UserGroupInformation authUgi;
- private final PrivilegedExceptionAction<?> action;
-
- public TimelineClientRetryOpForOperateDelegationToken(
- UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
- this.authUgi = authUgi;
- this.action = action;
- }
-
- @Override
- public Object run() throws IOException {
- // Try pass the request, if fail, keep retrying
- authUgi.checkTGTAndReloginFromKeytab();
- try {
- return authUgi.doAs(action);
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public boolean shouldRetryOn(Exception e) {
- // retry on connection exceptions
- // and SocketTimeoutException
- return (e instanceof ConnectException
- || e instanceof SocketTimeoutException);
- }
- }
-
- private final class EntitiesHolder extends FutureTask<Void> {
- private final
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities;
- private final boolean isSync;
-
- EntitiesHolder(
- final
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities,
- final boolean isSync) {
- super(new Callable<Void>() {
- // publishEntities()
- public Void call() throws Exception {
- MultivaluedMap<String, String> params = new MultivaluedMapImpl();
- params.add("appid", getContextAppId().toString());
- params.add("async", Boolean.toString(!isSync));
- putObjects("entities", params, entities);
- return null;
- }
- });
- this.entities = entities;
- this.isSync = isSync;
- }
-
- public boolean isSync() {
- return isSync;
- }
-
- public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- getEntities() {
- return entities;
- }
- }
-
- /**
- * This class is responsible for collecting the timeline entities and
- * publishing them in async.
- */
- private class TimelineEntityDispatcher {
- /**
- * Time period for which the timelineclient will wait for draining after
- * stop.
- */
- private static final long DRAIN_TIME_PERIOD = 2000L;
-
- private int numberOfAsyncsToMerge;
- private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
- private ExecutorService executor;
-
- TimelineEntityDispatcher(Configuration conf) {
- timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
- numberOfAsyncsToMerge =
- conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
- YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
- }
-
- Runnable createRunnable() {
- return new Runnable() {
- @Override
- public void run() {
- try {
- EntitiesHolder entitiesHolder;
- while (!Thread.currentThread().isInterrupted()) {
- // Merge all the async calls and make one push, but if its sync
- // call push immediately
- try {
- entitiesHolder = timelineEntityQueue.take();
- } catch (InterruptedException ie) {
- LOG.info("Timeline dispatcher thread was interrupted ");
- Thread.currentThread().interrupt();
- return;
- }
- if (entitiesHolder != null) {
- publishWithoutBlockingOnQueue(entitiesHolder);
- }
- }
- } finally {
- if (!timelineEntityQueue.isEmpty()) {
- LOG.info("Yet to publish " + timelineEntityQueue.size()
- + " timelineEntities, draining them now. ");
- }
- // Try to drain the remaining entities to be published @ the max for
- // 2 seconds
- long timeTillweDrain =
- System.currentTimeMillis() + DRAIN_TIME_PERIOD;
- while (!timelineEntityQueue.isEmpty()) {
- publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
- if (System.currentTimeMillis() > timeTillweDrain) {
- // time elapsed stop publishing further....
- if (!timelineEntityQueue.isEmpty()) {
- LOG.warn("Time to drain elapsed! Remaining "
- + timelineEntityQueue.size() + "timelineEntities will not"
- + " be published");
- // if some entities were not drained then we need interrupt
- // the threads which had put sync EntityHolders to the queue.
- EntitiesHolder nextEntityInTheQueue = null;
- while ((nextEntityInTheQueue =
- timelineEntityQueue.poll()) != null) {
- nextEntityInTheQueue.cancel(true);
- }
- }
- break;
- }
- }
- }
- }
-
- /**
- * Publishes the given EntitiesHolder and return immediately if sync
- * call, else tries to fetch the EntitiesHolder from the queue in non
- * blocking fashion and collate the Entities if possible before
- * publishing through REST.
- *
- * @param entitiesHolder
- */
- private void publishWithoutBlockingOnQueue(
- EntitiesHolder entitiesHolder) {
- if (entitiesHolder.isSync()) {
- entitiesHolder.run();
- return;
- }
- int count = 1;
- while (true) {
- // loop till we find a sync put Entities or there is nothing
- // to take
- EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
- if (nextEntityInTheQueue == null) {
- // Nothing in the queue just publish and get back to the
- // blocked wait state
- entitiesHolder.run();
- break;
- } else if (nextEntityInTheQueue.isSync()) {
- // flush all the prev async entities first
- entitiesHolder.run();
- // and then flush the sync entity
- nextEntityInTheQueue.run();
- break;
- } else {
- // append all async entities together and then flush
- entitiesHolder.getEntities().addEntities(
- nextEntityInTheQueue.getEntities().getEntities());
- count++;
- if (count == numberOfAsyncsToMerge) {
- // Flush the entities if the number of the async
- // putEntites merged reaches the desired limit. To avoid
- // collecting multiple entities and delaying for a long
- // time.
- entitiesHolder.run();
- break;
- }
- }
- }
- }
- };
- }
-
- public void dispatchEntities(boolean sync,
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
- entitiesTobePublished) throws YarnException {
- if (executor.isShutdown()) {
- throw new YarnException("Timeline client is in the process of stopping,"
- + " not accepting any more TimelineEntities");
- }
-
- // wrap all TimelineEntity into TimelineEntities object
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
- entities =
- new org.apache.hadoop.yarn.api.records.timelineservice.
- TimelineEntities();
- for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
- entity : entitiesTobePublished) {
- entities.addEntity(entity);
- }
-
- // created a holder and place it in queue
- EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
- try {
- timelineEntityQueue.put(entitiesHolder);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException(
- "Failed while adding entity to the queue for publishing", e);
- }
-
- if (sync) {
- // In sync call we need to wait till its published and if any error then
- // throw it back
- try {
- entitiesHolder.get();
- } catch (ExecutionException e) {
- throw new YarnException("Failed while publishing entity",
- e.getCause());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new YarnException("Interrupted while publishing entity", e);
- }
- }
- }
-
- public void start() {
- executor = Executors.newSingleThreadExecutor();
- executor.execute(createRunnable());
- }
-
- public void stop() {
- LOG.info("Stopping TimelineClient.");
- executor.shutdownNow();
- try {
- executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
- }
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org