You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/25 12:20:07 UTC
tez git commit: TEZ-2565. Consider scanning unfinished tasks in
VertexImpl::constructStatistics to reduce merge overhead (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 7806ae9b4 -> 0efaae854
TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0efaae85
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0efaae85
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0efaae85
Branch: refs/heads/master
Commit: 0efaae854ed9743f3d07b7ce6fc85279eac14dd5
Parents: 7806ae9
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Jun 25 15:52:00 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Jun 25 15:52:00 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/api/VertexManagerPluginContext.java | 9 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 58 +++++++++++++++-----
.../tez/dag/app/TestMemoryWithEvents.java | 6 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 31 +++++++++++
5 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c46bf26..90164cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Apache Tez Change Log
Release 0.8.0: Unreleased
INCOMPATIBLE CHANGES
+ TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
TEZ-2468. Change the minimum Java version to Java 7.
http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 345ea43..a6096d8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.VertexStatistics;
@@ -61,7 +60,7 @@ public interface VertexManagerPluginContext {
return locationHint;
}
}
-
+
/**
* Get the edge properties on the input edges of this vertex. The input edge
* is represented by the source vertex name
@@ -79,9 +78,9 @@ public interface VertexManagerPluginContext {
/**
* Get a {@link VertexStatistics} object to find out execution statistics
* about the given {@link Vertex}.
- * <br>This only provides point in time values for the statistics and must be
- * called again to get updated values.
- *
+ * <br>This only provides point in time values for statistics (completed tasks)
+ * and must be called again to get updated values.
+ *
* @param vertexName
* Name of the {@link Vertex}
* @return {@link VertexStatistics} for the given vertex
http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index aa8f593..a9bcdd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -76,6 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -783,7 +785,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private VertexStats vertexStats = null;
private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
-
+
+ @VisibleForTesting
+ VertexStatisticsImpl completedTasksStatsCache;
+
static class EventInfo {
final TezEvent tezEvent;
final Edge eventEdge;
@@ -814,23 +819,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
class VertexStatisticsImpl implements VertexStatistics {
final Map<String, IOStatisticsImpl> ioStats;
-
+ final BitSet taskSet;
+
public VertexStatisticsImpl() {
ioStats = Maps.newHashMapWithExpectedSize(ioIndices.size());
- for (String name : ioIndices.keySet()) {
+ taskSet = new BitSet();
+ for (String name : getIOIndices().keySet()) {
ioStats.put(name, new IOStatisticsImpl());
}
}
-
+
public IOStatisticsImpl getIOStatistics(String ioName) {
return ioStats.get(ioName);
}
-
+
void mergeFrom(TaskStatistics taskStats) {
if (taskStats == null) {
return;
}
-
+
for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats
.getIOStatistics().entrySet()) {
String ioName = entry.getKey();
@@ -850,8 +857,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
public OutputStatistics getOutputStatistics(String outputName) {
return getIOStatistics(outputName);
}
+
+ void addTask(TezTaskID taskID) {
+ taskSet.set(taskID.getId());
+ }
+
+ boolean containsTask(TezTaskID taskID) {
+ return taskSet.get(taskID.getId());
+ }
}
-
+
+ void resetCompletedTaskStatsCache(boolean recompute) {
+ completedTasksStatsCache = new VertexStatisticsImpl();
+ if (recompute) {
+ for (Task t : getTasks().values()) {
+ if (t.getState() == TaskState.SUCCEEDED) {
+ completedTasksStatsCache.mergeFrom(((TaskImpl) t).getStatistics());
+ }
+ }
+ }
+ }
+
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration dagConf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
@@ -3595,15 +3621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.constructFinalFullcounters();
}
}
-
+
private VertexStatisticsImpl constructStatistics() {
- VertexStatisticsImpl stats = new VertexStatisticsImpl();
- for (Task t : this.tasks.values()) {
- TaskStatistics taskStats = ((TaskImpl)t).getStatistics();
- stats.mergeFrom(taskStats);
- }
-
- return stats;
+ return completedTasksStatsCache;
}
@Private
@@ -3816,6 +3836,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
@Override
public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ if (vertex.completedTasksStatsCache == null) {
+ vertex.resetCompletedTaskStatsCache(false);
+ }
boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
@@ -3824,6 +3847,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
Task task = vertex.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(vertex, task);
+ if (!vertex.completedTasksStatsCache.containsTask(task.getTaskId())) {
+ vertex.completedTasksStatsCache.addTask(task.getTaskId());
+ vertex.completedTasksStatsCache.mergeFrom(((TaskImpl) task).getStatistics());
+ }
} else if (taskEvent.getState() == TaskState.FAILED) {
LOG.info("Failing vertex: " + vertex.logIdentifier +
" because task failed: " + taskEvent.getTaskID());
@@ -3871,6 +3898,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
//succeeded task is restarted back
vertex.completedTaskCount--;
vertex.succeededTaskCount--;
+ vertex.resetCompletedTaskStatsCache(true);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
index c277b38..9816e20 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
import java.io.IOException;
import java.util.List;
+import com.google.common.base.Stopwatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -116,6 +117,8 @@ public class TestMemoryWithEvents {
}
private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.start();
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
@@ -133,7 +136,8 @@ public class TestMemoryWithEvents {
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
checkMemory(dag.getName(), mockApp);
-
+ stopwatch.stop();
+ System.out.println("Time taken(ms): " + stopwatch.elapsedMillis());
tezClient.stop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 8b2a1b4..91465b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.runtime.api.VertexStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -5778,6 +5779,36 @@ public class TestVertexImpl {
Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
}
+ @Test (timeout=5000)
+ public void testCompletedStatsCache() {
+ initAllVertices(VertexState.INITED);
+ VertexImpl v = vertices.get("vertex2");
+ startVertex(v);
+
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+
+ dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+ dispatcher.await();
+
+
+ VertexStatistics stats = v.getStatistics();
+
+ //Ensure that task 0 is available in completed stats cache
+ Assert.assertTrue(v.completedTasksStatsCache.taskSet.get(0));
+
+ //Reschedule task 0
+ dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(t1));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+
+ //cache should be cleared
+ Assert.assertTrue(v.completedTasksStatsCache.taskSet.cardinality() == 0);
+ }
+
@Test (timeout = 5000)
public void testRouteEvent_RecoveredEvent() throws IOException {
doReturn(historyEventHandler).when(appContext).getHistoryHandler();