You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:15 UTC
[06/50] [abbrv] tez git commit: TEZ-1701. ATS fixes to flush all
history events and also using batching. (hitesh)
TEZ-1701. ATS fixes to flush all history events and also using batching. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a6c8006c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a6c8006c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a6c8006c
Branch: refs/heads/TEZ-8
Commit: a6c8006c37334099a8a77befa3b687113514a35c
Parents: 74c7f87
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Oct 30 07:54:45 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Oct 30 07:54:45 2014 -0700
----------------------------------------------------------------------
.../apache/tez/dag/api/TezConfiguration.java | 19 ++-
.../logging/ats/ATSHistoryLoggingService.java | 131 +++++++++++++------
.../ats/TestATSHistoryLoggingService.java | 22 +++-
3 files changed, 126 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1919f5c..7cc0aa5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -733,7 +733,24 @@ public class TezConfiguration extends Configuration {
public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
- 3000l;
+ -1;
+
+ /**
+ * Int value. Max no. of events to send in a single batch to ATS.
+ * Expert level setting.
+ */
+ public static final String YARN_ATS_MAX_EVENTS_PER_BATCH =
+ TEZ_PREFIX + "yarn.ats.max.events.per.batch";
+ public static final int YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT = 5;
+
+
+ /**
+ * Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS.
+ * Expert level setting.
+ */
+ public static final String YARN_ATS_MAX_POLLING_TIME_PER_EVENT = TEZ_PREFIX
+ + "yarn.ats.max.polling.time.per.event.millis";
+ public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
/**
* Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index aa1dd06..cdfbf7c 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -19,12 +19,16 @@
package org.apache.tez.dag.history.logging.ats;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -56,6 +60,10 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
private long maxTimeToWaitOnShutdown;
+ private boolean waitForeverOnShutdown = false;
+
+ private int maxEventsPerBatch;
+ private long maxPollingTimeMillis;
public ATSHistoryLoggingService() {
super(ATSHistoryLoggingService.class.getName());
@@ -69,17 +77,29 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
maxTimeToWaitOnShutdown = conf.getLong(
TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+ maxEventsPerBatch = conf.getInt(
+ TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH,
+ TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT);
+ maxPollingTimeMillis = conf.getInt(
+ TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT,
+ TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT);
+ if (maxTimeToWaitOnShutdown < 0) {
+ waitForeverOnShutdown = true;
+ }
}
@Override
public void serviceStart() {
LOG.info("Starting ATSService");
timelineClient.start();
+
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
- DAGHistoryEvent event;
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+ boolean interrupted = false;
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()
+ && !interrupted) {
// Log the size of the event-queue every so often.
if (eventCounter != 0 && eventCounter % 1000 == 0) {
@@ -92,20 +112,18 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
++eventCounter;
}
- try {
- event = eventQueue.take();
- } catch (InterruptedException e) {
- LOG.info("EventQueue take interrupted. Returning");
- return;
- }
-
synchronized (lock) {
- ++eventsProcessed;
try {
- handleEvent(event);
+ getEventBatch(events);
+ } catch (InterruptedException e) {
+ // Finish processing events and then return
+ interrupted = true;
+ }
+ eventsProcessed += events.size();
+ try {
+ handleEvents(events);
} catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
- LOG.warn("Error handling event", e);
+ LOG.warn("Error handling events", e);
}
}
}
@@ -126,21 +144,27 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
if (!eventQueue.isEmpty()) {
LOG.warn("ATSService being stopped"
+ ", eventQueueBacklog=" + eventQueue.size()
- + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+ + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown
+ + ", waitForever=" + waitForeverOnShutdown);
long startTime = appContext.getClock().getTime();
- if (maxTimeToWaitOnShutdown > 0) {
- long endTime = startTime + maxTimeToWaitOnShutdown;
- while (endTime >= appContext.getClock().getTime()) {
- DAGHistoryEvent event = eventQueue.poll();
- if (event == null) {
- break;
- }
- try {
- handleEvent(event);
- } catch (Exception e) {
- LOG.warn("Error handling event", e);
- break;
- }
+ long endTime = startTime + maxTimeToWaitOnShutdown;
+ List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+ while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) {
+ try {
+ getEventBatch(events);
+ } catch (InterruptedException e) {
+ LOG.info("ATSService interrupted while shutting down. Exiting."
+ + " EventQueueBacklog=" + eventQueue.size());
+ }
+ if (events.isEmpty()) {
+ LOG.info("Event queue empty, stopping ATS Service");
+ break;
+ }
+ try {
+ handleEvents(events);
+ } catch (Exception e) {
+ LOG.warn("Error handling event", e);
+ break;
}
}
}
@@ -152,13 +176,33 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
timelineClient.stop();
}
+ private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
+ events.clear();
+ int counter = 0;
+ while (counter < maxEventsPerBatch) {
+ DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ break;
+ }
+ if (!isValidEvent(event)) {
+ continue;
+ }
+ ++counter;
+ events.add(event);
+ if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
+ // Special case this as it might be a large payload
+ break;
+ }
+ }
+ }
+
+
public void handle(DAGHistoryEvent event) {
eventQueue.add(event);
}
- private void handleEvent(DAGHistoryEvent event) {
+ private boolean isValidEvent(DAGHistoryEvent event) {
HistoryEventType eventType = event.getHistoryEvent().getEventType();
-
TezDAGID dagId = event.getDagID();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
@@ -167,43 +211,52 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
String dagName = dagSubmittedEvent.getDAGName();
if (dagName != null
&& dagName.startsWith(
- TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+ TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
// Skip recording pre-warm DAG events
skippedDAGs.add(dagId);
- return;
+ return false;
}
}
if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
// Remove from set to keep size small
// No more events should be seen after this point.
if (skippedDAGs.remove(dagId)) {
- return;
+ return false;
}
}
if (dagId != null && skippedDAGs.contains(dagId)) {
// Skip pre-warm DAGs
- return;
+ return false;
+ }
+
+ return true;
+ }
+
+
+
+ private void handleEvents(List<DAGHistoryEvent> events) {
+ TimelineEntity[] entities = new TimelineEntity[events.size()];
+ for (int i = 0; i < events.size(); ++i) {
+ entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(
+ events.get(i).getHistoryEvent());
}
try {
TimelinePutResponse response =
- timelineClient.putEntities(
- HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+ timelineClient.putEntities(entities);
if (response != null
&& !response.getErrors().isEmpty()) {
TimelinePutError err = response.getErrors().get(0);
if (err.getErrorCode() != 0) {
- LOG.warn("Could not post history event to ATS, eventType="
- + eventType
+ LOG.warn("Could not post history events to ATS"
+ ", atsPutError=" + err.getErrorCode());
}
}
// Do nothing additional, ATS client library should handle throttling
// or auto-disable as needed
} catch (Exception e) {
- LOG.warn("Could not handle history event, eventType="
- + eventType, e);
+ LOG.warn("Could not handle history events", e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 7bd3b91..06bc065 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -32,10 +32,10 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -45,6 +45,7 @@ public class TestATSHistoryLoggingService {
private AppContext appContext;
private Configuration conf;
private int atsInvokeCounter;
+ private int atsEntitiesCounter;
private SystemClock clock = new SystemClock();
@Before
@@ -56,14 +57,17 @@ public class TestATSHistoryLoggingService {
conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
1000l);
atsInvokeCounter = 0;
+ atsEntitiesCounter = 0;
atsHistoryLoggingService.init(conf);
atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
atsHistoryLoggingService.start();
when(appContext.getClock()).thenReturn(clock);
- when(atsHistoryLoggingService.timelineClient.putEntities(any(TimelineEntity.class))).thenAnswer(
+ when(atsHistoryLoggingService.timelineClient.putEntities(
+ Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
+ atsEntitiesCounter += invocation.getArguments().length;
++atsInvokeCounter;
try {
Thread.sleep(500);
@@ -89,20 +93,26 @@ public class TestATSHistoryLoggingService {
DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
- for (int i = 0; i < 20; ++i) {
+ for (int i = 0; i < 100; ++i) {
atsHistoryLoggingService.handle(historyEvent);
}
try {
- Thread.sleep(2500l);
+ Thread.sleep(500l);
} catch (InterruptedException e) {
// Do nothing
}
atsHistoryLoggingService.stop();
- Assert.assertTrue(atsInvokeCounter >= 4);
- Assert.assertTrue(atsInvokeCounter < 10);
+ Assert.assertTrue("ATSEntitiesCounter = " + atsEntitiesCounter, atsEntitiesCounter >= 10);
+ Assert.assertTrue("ATSEntitiesCounter = " + atsEntitiesCounter, atsEntitiesCounter < 50);
+ Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter, atsInvokeCounter >= 1);
+ Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter, atsInvokeCounter <= 10);
+ Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter
+ + ", ATSEntitiesCounter = " + atsEntitiesCounter,
+ atsEntitiesCounter >=
+ ((atsInvokeCounter-1) * TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT));
}
}