You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:08 UTC
[23/25] git commit: TEZ-1533. Request Events more often if a complete
set of events is received by a task. (sseth)
TEZ-1533. Request Events more often if a complete set of events is
received by a task. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8bd2ca35
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8bd2ca35
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8bd2ca35
Branch: refs/heads/TEZ-8
Commit: 8bd2ca352664fab8d248593f60faa6c0ff223920
Parents: 3b34c41
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 16 03:54:45 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 16 03:54:45 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/runtime/task/TaskReporter.java | 53 ++++++---
.../tez/runtime/task/TestTaskReporter.java | 115 +++++++++++++++++++
3 files changed, 151 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca4e7e2..83048bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -42,6 +42,7 @@ ALL CHANGES
of DAG submission
TEZ-1571. Add create method for DataSinkDescriptor.
TEZ-1585. Memory leak in tez session mode.
+ TEZ-1533. Request Events more often if a complete set of events is received by a task.
Release 0.5.0: 2014-09-03
http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index fcb8778..15dcbb0 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -115,7 +115,8 @@ public class TaskReporter {
heartbeatExecutor.shutdownNow();
}
- private static class HeartbeatCallable implements Callable<Boolean> {
+ @VisibleForTesting
+ static class HeartbeatCallable implements Callable<Boolean> {
private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
private static final float LOG_COUNTER_BACKOFF = 1.3f;
@@ -172,25 +173,30 @@ public class TaskReporter {
public Boolean call() throws Exception {
// Heartbeat only for active tasks. Errors, etc will be reported directly.
while (!task.isTaskDone() && !task.hadFatalError()) {
- boolean result = heartbeat(null);
- if (!result) {
+ ResponseWrapper response = heartbeat(null);
+
+ if (response.shouldDie) {
// AM sent a shouldDie=true
LOG.info("Asked to die via task heartbeat");
return false;
- }
- lock.lock();
- try {
- boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
- if (!interrupted) {
- nonOobHeartbeatCounter++;
+ } else {
+ if (response.numEvents < maxEventsToGet) {
+ // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
+ lock.lock();
+ try {
+ boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
+ if (!interrupted) {
+ nonOobHeartbeatCounter++;
+ }
+ } finally {
+ lock.unlock();
+ }
}
- } finally {
- lock.unlock();
}
}
int pendingEventCount = eventsToSend.size();
if (pendingEventCount > 0) {
- LOG.warn("Exiting TaskReporter therad with pending queue size=" + pendingEventCount);
+ LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
}
return true;
}
@@ -203,7 +209,7 @@ public class TaskReporter {
* @throws TezException
* indicates an exception somewhere in the AM.
*/
- private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
+ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
TezException {
if (eventsArg != null) {
@@ -247,7 +253,7 @@ public class TaskReporter {
if (response.shouldDie()) {
LOG.info("Received should die response from AM");
- return false;
+ return new ResponseWrapper(true, 1);
}
if (response.getLastRequestId() != requestId) {
throw new TezException("AM and Task out of sync" + ", responseReqId="
@@ -256,6 +262,7 @@ public class TaskReporter {
// The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
// are running using the same umbilical.
+ int numEventsReceived = 0;
if (task.isTaskDone() || task.hadFatalError()) {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
LOG.warn("Current task already complete, Ignoring all event in"
@@ -268,11 +275,11 @@ public class TaskReporter {
+ task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
}
// This should ideally happen in a separate thread
+ numEventsReceived = response.getEvents().size();
task.handleEvents(response.getEvents());
}
}
- return true;
-
+ return new ResponseWrapper(false, numEventsReceived);
}
public void markComplete() {
@@ -308,7 +315,7 @@ public class TaskReporter {
task.getProgress()), updateEventMetadata);
TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
updateEventMetadata);
- return heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent));
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
}
/**
@@ -334,7 +341,7 @@ public class TaskReporter {
}
TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
srcMeta == null ? updateEventMetadata : srcMeta);
- return heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent));
+ return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskAttemptFailedEvent)).shouldDie;
}
private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
@@ -381,4 +388,14 @@ public class TaskReporter {
public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
return umbilical.canCommit(taskAttemptID);
}
+
+ private static final class ResponseWrapper {
+ boolean shouldDie;
+ int numEvents;
+
+ private ResponseWrapper(boolean shouldDie, int numEvents) {
+ this.shouldDie = shouldDie;
+ this.numEvents = numEvents;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8bd2ca35/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
new file mode 100644
index 0000000..de03307
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTaskReporter {
+
+ @Test(timeout = 10000)
+ public void testContinuousHeartbeatsOnMaxEvents() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicBoolean hb2Done = new AtomicBoolean(false);
+
+ TezTaskUmbilicalProtocol mockUmbilical = mock(TezTaskUmbilicalProtocol.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ TezHeartbeatRequest request = (TezHeartbeatRequest) args[0];
+ if (request.getRequestId() == 1 || request.getRequestId() == 2) {
+ TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(5));
+ response.setLastRequestId(request.getRequestId());
+ return response;
+ } else if (request.getRequestId() == 3) {
+ TezHeartbeatResponse response = new TezHeartbeatResponse(createEvents(1));
+ response.setLastRequestId(request.getRequestId());
+ synchronized (lock) {
+ hb2Done.set(true);
+ lock.notify();
+ }
+ return response;
+ } else {
+ throw new TezUncheckedException("Invalid request id for test: " + request.getRequestId());
+ }
+ }
+ }).when(mockUmbilical).heartbeat(any(TezHeartbeatRequest.class));
+
+ TezTaskAttemptID mockTaskAttemptId = mock(TezTaskAttemptID.class);
+ LogicalIOProcessorRuntimeTask mockTask = mock(LogicalIOProcessorRuntimeTask.class);
+ doReturn("vertexName").when(mockTask).getVertexName();
+ doReturn(mockTaskAttemptId).when(mockTask).getTaskAttemptID();
+
+ // Setup the sleep time to be way higher than the test timeout
+ TaskReporter.HeartbeatCallable heartbeatCallable =
+ new TaskReporter.HeartbeatCallable(mockTask, mockUmbilical, 100000, 100000, 5,
+ new AtomicLong(0),
+ "containerIdStr");
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.submit(heartbeatCallable);
+ try {
+ synchronized (lock) {
+ if (!hb2Done.get()) {
+ lock.wait();
+ }
+ }
+ verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+ Thread.sleep(2000l);
+ // Sleep for 2 seconds, less than the callable sleep time. No more invocations.
+ verify(mockUmbilical, times(3)).heartbeat(any(TezHeartbeatRequest.class));
+ } finally {
+ executor.shutdownNow();
+ }
+
+ }
+
+ private List<TezEvent> createEvents(int numEvents) {
+ List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
+ for (int i = 0; i < numEvents; i++) {
+ list.add(mock(TezEvent.class));
+ }
+ return list;
+ }
+}