You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/04/26 08:25:54 UTC

[3/4] hive git commit: HIVE-13431. Improvements to LLAPTaskReporter - event throttling. (Siddharth Seth, reviewed by Prasanth Jayachandran)

HIVE-13431. Improvements to LLAPTaskReporter - event throttling. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5665e34
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5665e34
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5665e34

Branch: refs/heads/master
Commit: f5665e34de5a6c6496c82bc59d3b5afaa612ab50
Parents: 0cf2244
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Apr 25 23:18:45 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Apr 25 23:18:45 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapTaskReporter.java | 53 +++++++++++++++-----
 .../llap/daemon/impl/TaskRunnerCallable.java    |  4 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    |  6 +--
 .../TestFirstInFirstOutComparator.java          |  4 +-
 4 files changed, 48 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 08c6f27..dc4482e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -40,8 +40,10 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -192,7 +194,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
     @Override
     public Boolean call() throws Exception {
       // Heartbeat only for active tasks. Errors, etc will be reported directly.
-      while (!task.isTaskDone() && !task.hadFatalError()) {
+      while (!task.isTaskDone() && !task.wasErrorReported()) {
         ResponseWrapper response = heartbeat(null);
 
         if (response.shouldDie) {
@@ -217,7 +219,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
       int pendingEventCount = eventsToSend.size();
       if (pendingEventCount > 0) {
         // This is OK because the pending events will be sent via the succeeded/failed messages.
-        // TaskDone is set before taskSucceeded/taskFailed are sent out - which is what causes the
+        // TaskDone is set before taskSucceeded/taskTerminated are sent out - which is what causes the
         // thread to exit
         LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
       }
@@ -243,7 +245,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
       List<TezEvent> events = new ArrayList<TezEvent>();
       eventsToSend.drainTo(events);
 
-      if (!task.isTaskDone() && !task.hadFatalError()) {
+      if (!task.isTaskDone() && !task.wasErrorReported()) {
         boolean sendCounters = false;
         /**
          * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
@@ -262,8 +264,9 @@ public class LlapTaskReporter implements TaskReporterInterface {
       long requestId = requestCounter.incrementAndGet();
       int fromEventId = task.getNextFromEventId();
       int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+      int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
       TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
-          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEventsToGet);
+          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Sending heartbeat to AM, request=" + request);
       }
@@ -288,7 +291,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
       // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
       // are running using the same umbilical.
       int numEventsReceived = 0;
-      if (task.isTaskDone() || task.hadFatalError()) {
+      if (task.isTaskDone() || task.wasErrorReported()) {
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
           LOG.warn("Current task already complete, Ignoring all event in"
               + " heartbeat response, eventCount=" + response.getEvents().size());
@@ -372,6 +375,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
     /**
      * Sends out final events for task failure.
      * @param taskAttemptID
+     * @param isKilled
+     * @param taskFailureType
      * @param t
      * @param diagnostics
      * @param srcMeta
@@ -381,19 +386,33 @@ public class LlapTaskReporter implements TaskReporterInterface {
      * @throws TezException
      *           indicates an exception somewhere in the AM.
      */
-    private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+    private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled,
+                               TaskFailureType taskFailureType, Throwable t, String diagnostics,
                                EventMetaData srcMeta) throws IOException, TezException {
       // Ensure only one final event is ever sent.
       if (!finalEventQueued.getAndSet(true)) {
-        TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
+        List<TezEvent> tezEvents = new ArrayList<>();
         if (diagnostics == null) {
           diagnostics = ExceptionUtils.getStackTrace(t);
         } else {
           diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
         }
-        TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
-            srcMeta == null ? updateEventMetadata : srcMeta);
-        return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
+
+        if (isKilled) {
+          tezEvents.add(new TezEvent(new TaskAttemptKilledEvent(diagnostics),
+              srcMeta == null ? updateEventMetadata : srcMeta));
+        } else {
+          tezEvents.add(new TezEvent(new TaskAttemptFailedEvent(diagnostics,
+              taskFailureType),
+              srcMeta == null ? updateEventMetadata : srcMeta));
+        }
+        try {
+          tezEvents.add(new TezEvent(getStatusUpdateEvent(true), updateEventMetadata));
+        } catch (Exception e) {
+          // Counter may exceed limitation
+          LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out");
+        }
+        return !heartbeat(tezEvents).shouldDie;
       } else {
         LOG.warn("A final task state event has already been sent. Not sending again");
         return askedToDie.get();
@@ -434,9 +453,19 @@ public class LlapTaskReporter implements TaskReporterInterface {
   }
 
   @Override
-  public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+  public boolean taskFailed(TezTaskAttemptID tezTaskAttemptID, TaskFailureType taskFailureType,
+                            Throwable throwable, String diagnostics, EventMetaData srcMeta) throws
+      IOException, TezException {
+    return currentCallable
+        .taskTerminated(tezTaskAttemptID, false, taskFailureType, throwable, diagnostics, srcMeta);
+  }
+
+  @Override
+  public boolean taskKilled(TezTaskAttemptID tezTaskAttemptID, Throwable throwable,
+                            String diagnostics,
                             EventMetaData srcMeta) throws IOException, TezException {
-    return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
+    return currentCallable
+        .taskTerminated(tezTaskAttemptID, true, null, throwable, diagnostics, srcMeta);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index a1cfbb8..2a60123 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -170,7 +170,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     synchronized (this) {
       if (!shouldRunTask) {
         LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
       }
     }
 
@@ -237,7 +237,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       }
       if (taskRunner == null) {
         LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
       }
 
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 4d05c35..24f4442 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -170,14 +170,14 @@ public class TaskExecutorTestHelpers {
           }
         } catch (InterruptedException e) {
           wasInterrupted.set(true);
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
         } finally {
           lock.unlock();
         }
         if (wasKilled.get()) {
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
         } else {
-          return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+          return new TaskRunner2Result(EndReason.SUCCESS, null, null, false);
         }
       } finally {
         lock.lock();

http://git-wip-us.apache.org/repos/asf/hive/blob/f5665e34/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 73df985..08ee769 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -70,9 +70,9 @@ public class TestFirstInFirstOutComparator {
       try {
         Thread.sleep(workTime);
       } catch (InterruptedException e) {
-        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
       }
-      return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+      return new TaskRunner2Result(EndReason.SUCCESS, null, null, false);
     }
 
     @Override