You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:29 UTC

[23/50] [abbrv] git commit: TEZ-1533. Request Events more often if a complete set of events is received by a task. (sseth)

TEZ-1533. Request Events more often if a complete set of events is
received by a task. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8bd2ca35
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8bd2ca35
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8bd2ca35

Branch: refs/heads/branch-0.5
Commit: 8bd2ca352664fab8d248593f60faa6c0ff223920
Parents: 3b34c41
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 16 03:54:45 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 16 03:54:45 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/runtime/task/TaskReporter.java   |  53 ++++++---
 .../tez/runtime/task/TestTaskReporter.java      | 115 +++++++++++++++++++
 3 files changed, 151 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca4e7e2..83048bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -42,6 +42,7 @@ ALL CHANGES
   of DAG submission
   TEZ-1571. Add create method for DataSinkDescriptor.
   TEZ-1585. Memory leak in tez session mode.
+  TEZ-1533. Request Events more often if a complete set of events is received by a task.
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index fcb8778..15dcbb0 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -115,7 +115,8 @@ public class TaskReporter {
     heartbeatExecutor.shutdownNow();
   }
 
-  private static class HeartbeatCallable implements Callable<Boolean> {
+  @VisibleForTesting
+  static class HeartbeatCallable implements Callable<Boolean> {
 
     private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
     private static final float LOG_COUNTER_BACKOFF = 1.3f;
@@ -172,25 +173,30 @@ public class TaskReporter {
     public Boolean call() throws Exception {
       // Heartbeat only for active tasks. Errors, etc will be reported directly.
       while (!task.isTaskDone() && !task.hadFatalError()) {
-        boolean result = heartbeat(null);
-        if (!result) {
+        ResponseWrapper response = heartbeat(null);
+
+        if (response.shouldDie) {
           // AM sent a shouldDie=true
           LOG.info("Asked to die via task heartbeat");
           return false;
-        }
-        lock.lock();
-        try {
-          boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
-          if (!interrupted) {
-            nonOobHeartbeatCounter++;
+        } else {
+          if (response.numEvents < maxEventsToGet) {
+            // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
+            lock.lock();
+            try {
+              boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+              if (!interrupted) {
+                nonOobHeartbeatCounter++;
+              }
+            } finally {
+              lock.unlock();
+            }
           }
-        } finally {
-          lock.unlock();
         }
       }
       int pendingEventCount = eventsToSend.size();
       if (pendingEventCount > 0) {
-        LOG.warn("Exiting TaskReporter therad with pending queue size=" + pendingEventCount);
+        LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
       }
       return true;
     }
@@ -203,7 +209,7 @@ public class TaskReporter {
      * @throws TezException
      *           indicates an exception somewhere in the AM.
      */
-    private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+    private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
         TezException {
 
       if (eventsArg != null) {
@@ -247,7 +253,7 @@ public class TaskReporter {
 
       if (response.shouldDie()) {
         LOG.info("Received should die response from AM");
-        return false;
+        return new ResponseWrapper(true, 1);
       }
       if (response.getLastRequestId() != requestId) {
         throw new TezException("AM and Task out of sync" + ", responseReqId="
@@ -256,6 +262,7 @@ public class TaskReporter {
 
       // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
       // are running using the same umbilical.
+      int numEventsReceived = 0;
       if (task.isTaskDone() || task.hadFatalError()) {
         if (response.getEvents() != null && !response.getEvents().isEmpty()) {
           LOG.warn("Current task already complete, Ignoring all event in"
@@ -268,11 +275,11 @@ public class TaskReporter {
                 + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
           }
           // This should ideally happen in a separate thread
+          numEventsReceived = response.getEvents().size();
           task.handleEvents(response.getEvents());
         }
       }
-      return true;
-
+      return new ResponseWrapper(false, numEventsReceived);
     }
 
     public void markComplete() {
@@ -308,7 +315,7 @@ public class TaskReporter {
           task.getProgress()), updateEventMetadata);
       TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
           updateEventMetadata);
-      return heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent));
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
     }
 
     /**
@@ -334,7 +341,7 @@ public class TaskReporter {
       }
       TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
           srcMeta == null ? updateEventMetadata : srcMeta);
-      return heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
+      return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
     }
 
     private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
@@ -381,4 +388,14 @@ public class TaskReporter {
   public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
     return umbilical.canCommit(taskAttemptID);
   }
+
+  private static final class ResponseWrapper {
+    boolean shouldDie;
+    int numEvents;
+
+    private ResponseWrapper(boolean shouldDie, int numEvents) {
+      this.shouldDie = shouldDie;
+      this.numEvents = numEvents;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
new file mode 100644
index 0000000..de03307
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTaskReporter {
+
+  @Test(timeout = 10000)
+  public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
+
+    final Object lock = new Object();
+    final AtomicBoolean hb2Done = new AtomicBoolean(false);
+
+    TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
+        if (request.getRequestId() == 1 || request.getRequestId() == 2) {
+          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
+          response.setLastRequestId(request.getRequestId());
+          return response;
+        } else if (request.getRequestId() == 3) {
+          TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
+          response.setLastRequestId(request.getRequestId());
+          synchronized (lock) {
+            hb2Done.set(true);
+            lock.notify();
+          }
+          return response;
+        } else {
+          throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
+        }
+      }
+    }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
+
+    TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+    LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+    doReturn("vertexName").when(mockTask).getVertexName();
+    doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+
+    // Setup the sleep time to be way higher than the test timeout
+    TaskReporter.HeartbeatCallable heartbeatCallable =
+        new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+            new AtomicLong(0),
+            "containerIdStr");
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    executor.submit(heartbeatCallable);
+    try {
+      synchronized (lock) {
+        if (!hb2Done.get()) {
+          lock.wait();
+        }
+      }
+      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+      Thread.sleep(2000l);
+      // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
+      verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+    } finally {
+      executor.shutdownNow();
+    }
+
+  }
+
+  private List<TezEvent> createEvents(int numEvents) {
+    List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
+    for (int i = 0; i < numEvents; i++) {
+      list.add(mock(TezEvent.class));
+    }
+    return list;
+  }
+}