You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:15 UTC

[06/50] [abbrv] tez git commit: TEZ-1701. ATS fixes to flush all history events and also using batching. (hitesh)

TEZ-1701. ATS fixes to flush all history events and also using batching. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a6c8006c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a6c8006c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a6c8006c

Branch: refs/heads/TEZ-8
Commit: a6c8006c37334099a8a77befa3b687113514a35c
Parents: 74c7f87
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Oct 30 07:54:45 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Oct 30 07:54:45 2014 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  19 ++-
 .../logging/ats/ATSHistoryLoggingService.java   | 131 +++++++++++++------
 .../ats/TestATSHistoryLoggingService.java       |  22 +++-
 3 files changed, 126 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 1919f5c..7cc0aa5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -733,7 +733,24 @@ public class TezConfiguration extends Configuration {
   public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
       TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
   public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
-      3000l;
+      -1;
+
+  /**
+   * Int value. Max no. of events to send in a single batch to ATS.
+   * Expert level setting.
+   */
+  public static final String YARN_ATS_MAX_EVENTS_PER_BATCH =
+      TEZ_PREFIX + "yarn.ats.max.events.per.batch";
+  public static final int YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT = 5;
+
+
+  /**
+   * Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS.
+   * Expert level setting.
+   */
+  public static final String YARN_ATS_MAX_POLLING_TIME_PER_EVENT = TEZ_PREFIX
+      + "yarn.ats.max.polling.time.per.event.millis";
+  public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
 
   /**
    * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the 

http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index aa1dd06..cdfbf7c 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -19,12 +19,16 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
@@ -56,6 +60,10 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
   private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
   private long maxTimeToWaitOnShutdown;
+  private boolean waitForeverOnShutdown = false;
+
+  private int maxEventsPerBatch;
+  private long maxPollingTimeMillis;
 
   public ATSHistoryLoggingService() {
     super(ATSHistoryLoggingService.class.getName());
@@ -69,17 +77,29 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     maxTimeToWaitOnShutdown = conf.getLong(
         TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
         TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT);
+    maxEventsPerBatch = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH,
+        TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT);
+    maxPollingTimeMillis = conf.getInt(
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT,
+        TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT);
+    if (maxTimeToWaitOnShutdown < 0) {
+      waitForeverOnShutdown = true;
+    }
   }
 
   @Override
   public void serviceStart() {
     LOG.info("Starting ATSService");
     timelineClient.start();
+
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
-        DAGHistoryEvent event;
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        boolean interrupted = false;
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()
+              && !interrupted) {
 
           // Log the size of the event-queue every so often.
           if (eventCounter != 0 && eventCounter % 1000 == 0) {
@@ -92,20 +112,18 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
             ++eventCounter;
           }
 
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.info("EventQueue take interrupted. Returning");
-            return;
-          }
-
           synchronized (lock) {
-            ++eventsProcessed;
             try {
-              handleEvent(event);
+              getEventBatch(events);
+            } catch (InterruptedException e) {
+              // Finish processing events and then return
+              interrupted = true;
+            }
+            eventsProcessed += events.size();
+            try {
+              handleEvents(events);
             } catch (Exception e) {
-              // TODO handle failures - treat as fatal or ignore?
-              LOG.warn("Error handling event", e);
+              LOG.warn("Error handling events", e);
             }
           }
         }
@@ -126,21 +144,27 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       if (!eventQueue.isEmpty()) {
         LOG.warn("ATSService being stopped"
             + ", eventQueueBacklog=" + eventQueue.size()
-            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown);
+            + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown
+            + ", waitForever=" + waitForeverOnShutdown);
         long startTime = appContext.getClock().getTime();
-        if (maxTimeToWaitOnShutdown > 0) {
-          long endTime = startTime + maxTimeToWaitOnShutdown;
-          while (endTime >= appContext.getClock().getTime()) {
-            DAGHistoryEvent event = eventQueue.poll();
-            if (event == null) {
-              break;
-            }
-            try {
-              handleEvent(event);
-            } catch (Exception e) {
-              LOG.warn("Error handling event", e);
-              break;
-            }
+        long endTime = startTime + maxTimeToWaitOnShutdown;
+        List<DAGHistoryEvent> events = new LinkedList<DAGHistoryEvent>();
+        while (waitForeverOnShutdown || (endTime >= appContext.getClock().getTime())) {
+          try {
+            getEventBatch(events);
+          } catch (InterruptedException e) {
+            LOG.info("ATSService interrupted while shutting down. Exiting."
+                  + " EventQueueBacklog=" + eventQueue.size());
+          }
+          if (events.isEmpty()) {
+            LOG.info("Event queue empty, stopping ATS Service");
+            break;
+          }
+          try {
+            handleEvents(events);
+          } catch (Exception e) {
+            LOG.warn("Error handling event", e);
+            break;
           }
         }
       }
@@ -152,13 +176,33 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     timelineClient.stop();
   }
 
+  private void getEventBatch(List<DAGHistoryEvent> events) throws InterruptedException {
+    events.clear();
+    int counter = 0;
+    while (counter < maxEventsPerBatch) {
+      DAGHistoryEvent event = eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS);
+      if (event == null) {
+        break;
+      }
+      if (!isValidEvent(event)) {
+        continue;
+      }
+      ++counter;
+      events.add(event);
+      if (event.getHistoryEvent().getEventType().equals(HistoryEventType.DAG_SUBMITTED)) {
+        // Special case this as it might be a large payload
+        break;
+      }
+    }
+  }
+
+
   public void handle(DAGHistoryEvent event) {
     eventQueue.add(event);
   }
 
-  private void handleEvent(DAGHistoryEvent event) {
+  private boolean isValidEvent(DAGHistoryEvent event) {
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
-
     TezDAGID dagId = event.getDagID();
 
     if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
@@ -167,43 +211,52 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
       String dagName = dagSubmittedEvent.getDAGName();
       if (dagName != null
           && dagName.startsWith(
-              TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+          TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) {
         // Skip recording pre-warm DAG events
         skippedDAGs.add(dagId);
-        return;
+        return false;
       }
     }
     if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
       // Remove from set to keep size small
       // No more events should be seen after this point.
       if (skippedDAGs.remove(dagId)) {
-        return;
+        return false;
       }
     }
 
     if (dagId != null && skippedDAGs.contains(dagId)) {
       // Skip pre-warm DAGs
-      return;
+      return false;
+    }
+
+    return true;
+  }
+
+
+
+  private void handleEvents(List<DAGHistoryEvent> events) {
+    TimelineEntity[] entities = new TimelineEntity[events.size()];
+    for (int i = 0; i < events.size(); ++i) {
+      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(
+          events.get(i).getHistoryEvent());
     }
 
     try {
       TimelinePutResponse response =
-          timelineClient.putEntities(
-              HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent()));
+          timelineClient.putEntities(entities);
       if (response != null
         && !response.getErrors().isEmpty()) {
         TimelinePutError err = response.getErrors().get(0);
         if (err.getErrorCode() != 0) {
-          LOG.warn("Could not post history event to ATS, eventType="
-              + eventType
+          LOG.warn("Could not post history events to ATS"
               + ", atsPutError=" + err.getErrorCode());
         }
       }
       // Do nothing additional, ATS client library should handle throttling
       // or auto-disable as needed
     } catch (Exception e) {
-      LOG.warn("Could not handle history event, eventType="
-          + eventType, e);
+      LOG.warn("Could not handle history events", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a6c8006c/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
index 7bd3b91..06bc065 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
@@ -32,10 +32,10 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -45,6 +45,7 @@ public class TestATSHistoryLoggingService {
   private AppContext appContext;
   private Configuration conf;
   private int atsInvokeCounter;
+  private int atsEntitiesCounter;
   private SystemClock clock = new SystemClock();
 
   @Before
@@ -56,14 +57,17 @@ public class TestATSHistoryLoggingService {
     conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
         1000l);
     atsInvokeCounter = 0;
+    atsEntitiesCounter = 0;
     atsHistoryLoggingService.init(conf);
     atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
     atsHistoryLoggingService.start();
     when(appContext.getClock()).thenReturn(clock);
-    when(atsHistoryLoggingService.timelineClient.putEntities(any(TimelineEntity.class))).thenAnswer(
+    when(atsHistoryLoggingService.timelineClient.putEntities(
+        Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
         new Answer<Object>() {
           @Override
           public Object answer(InvocationOnMock invocation) throws Throwable {
+            atsEntitiesCounter += invocation.getArguments().length;
             ++atsInvokeCounter;
             try {
               Thread.sleep(500);
@@ -89,20 +93,26 @@ public class TestATSHistoryLoggingService {
     DAGHistoryEvent historyEvent = new DAGHistoryEvent(tezDAGID,
         new DAGStartedEvent(tezDAGID, 1001l, "user1", "dagName1"));
 
-    for (int i = 0; i < 20; ++i) {
+    for (int i = 0; i < 100; ++i) {
       atsHistoryLoggingService.handle(historyEvent);
     }
 
     try {
-      Thread.sleep(2500l);
+      Thread.sleep(500l);
     } catch (InterruptedException e) {
       // Do nothing
     }
     atsHistoryLoggingService.stop();
 
-    Assert.assertTrue(atsInvokeCounter >= 4);
-    Assert.assertTrue(atsInvokeCounter < 10);
+    Assert.assertTrue("ATSEntitiesCounter = " + atsEntitiesCounter, atsEntitiesCounter >= 10);
+    Assert.assertTrue("ATSEntitiesCounter = " + atsEntitiesCounter, atsEntitiesCounter < 50);
+    Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter, atsInvokeCounter >= 1);
+    Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter, atsInvokeCounter <= 10);
 
+    Assert.assertTrue("ATSInvokeCounter = " + atsInvokeCounter
+            + ", ATSEntitiesCounter = " + atsEntitiesCounter,
+        atsEntitiesCounter >=
+            ((atsInvokeCounter-1) * TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT));
   }
 
 }