You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/25 12:20:07 UTC

tez git commit: TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 7806ae9b4 -> 0efaae854


TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0efaae85
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0efaae85
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0efaae85

Branch: refs/heads/master
Commit: 0efaae854ed9743f3d07b7ce6fc85279eac14dd5
Parents: 7806ae9
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Jun 25 15:52:00 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Jun 25 15:52:00 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/api/VertexManagerPluginContext.java |  9 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 58 +++++++++++++++-----
 .../tez/dag/app/TestMemoryWithEvents.java       |  6 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 31 +++++++++++
 5 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c46bf26..90164cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Apache Tez Change Log
 Release 0.8.0: Unreleased
 
 INCOMPATIBLE CHANGES
+  TEZ-2565. Consider scanning unfinished tasks in VertexImpl::constructStatistics to reduce merge overhead.
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
   TEZ-2468. Change the minimum Java version to Java 7.
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 345ea43..a6096d8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.VertexStatistics;
@@ -61,7 +60,7 @@ public interface VertexManagerPluginContext {
       return locationHint;
     }
   }
-  
+
   /**
    * Get the edge properties on the input edges of this vertex. The input edge 
    * is represented by the source vertex name
@@ -79,9 +78,9 @@ public interface VertexManagerPluginContext {
   /**
    * Get a {@link VertexStatistics} object to find out execution statistics
    * about the given {@link Vertex}.
-   * <br>This only provides point in time values for the statistics and must be
-   * called again to get updated values.
-   * 
+   * <br>This only provides point in time values for statistics (completed tasks)
+   * and must be called again to get updated values.
+   *
    * @param vertexName
    *          Name of the {@link Vertex}
    * @return {@link VertexStatistics} for the given vertex

http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index aa8f593..a9bcdd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -76,6 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -783,7 +785,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private VertexStats vertexStats = null;
 
   private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
-  
+
+  @VisibleForTesting
+  VertexStatisticsImpl completedTasksStatsCache;
+
   static class EventInfo {
     final TezEvent tezEvent;
     final Edge eventEdge;
@@ -814,23 +819,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   class VertexStatisticsImpl implements VertexStatistics {
     final Map<String, IOStatisticsImpl> ioStats;
-    
+    final BitSet taskSet;
+
     public VertexStatisticsImpl() {
       ioStats = Maps.newHashMapWithExpectedSize(ioIndices.size());
-      for (String name : ioIndices.keySet()) {
+      taskSet = new BitSet();
+      for (String name : getIOIndices().keySet()) {
         ioStats.put(name, new IOStatisticsImpl());
       }
     }
-    
+
     public IOStatisticsImpl getIOStatistics(String ioName) {
       return ioStats.get(ioName);
     }
-    
+
     void mergeFrom(TaskStatistics taskStats) {
       if (taskStats == null) {
         return;
       }
-      
+
       for (Map.Entry<String, org.apache.tez.runtime.api.impl.IOStatistics> entry : taskStats
           .getIOStatistics().entrySet()) {
         String ioName = entry.getKey();
@@ -850,8 +857,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     public OutputStatistics getOutputStatistics(String outputName) {
       return getIOStatistics(outputName);
     }
+
+    void addTask(TezTaskID taskID) {
+      taskSet.set(taskID.getId());
+    }
+
+    boolean containsTask(TezTaskID taskID) {
+      return taskSet.get(taskID.getId());
+    }
   }
-  
+
+  void resetCompletedTaskStatsCache(boolean recompute) {
+    completedTasksStatsCache = new VertexStatisticsImpl();
+    if (recompute) {
+      for (Task t : getTasks().values()) {
+        if (t.getState() == TaskState.SUCCEEDED) {
+          completedTasksStatsCache.mergeFrom(((TaskImpl) t).getStatistics());
+        }
+      }
+    }
+  }
+
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration dagConf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
@@ -3595,15 +3621,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       this.constructFinalFullcounters();
     }
   }
-  
+
   private VertexStatisticsImpl constructStatistics() {
-    VertexStatisticsImpl stats = new VertexStatisticsImpl();
-    for (Task t : this.tasks.values()) {
-      TaskStatistics  taskStats = ((TaskImpl)t).getStatistics();
-      stats.mergeFrom(taskStats);
-    }
-    
-    return stats;
+    return completedTasksStatsCache;
   }
 
   @Private
@@ -3816,6 +3836,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      if (vertex.completedTasksStatsCache == null) {
+        vertex.resetCompletedTaskStatsCache(false);
+      }
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
       LOG.info("Num completed Tasks for " + vertex.logIdentifier + " : "
@@ -3824,6 +3847,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       Task task = vertex.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {
         taskSucceeded(vertex, task);
+        if (!vertex.completedTasksStatsCache.containsTask(task.getTaskId())) {
+          vertex.completedTasksStatsCache.addTask(task.getTaskId());
+          vertex.completedTasksStatsCache.mergeFrom(((TaskImpl) task).getStatistics());
+        }
       } else if (taskEvent.getState() == TaskState.FAILED) {
         LOG.info("Failing vertex: " + vertex.logIdentifier +
             " because task failed: " + taskEvent.getTaskID());
@@ -3871,6 +3898,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       //succeeded task is restarted back
       vertex.completedTaskCount--;
       vertex.succeededTaskCount--;
+      vertex.resetCompletedTaskStatsCache(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
index c277b38..9816e20 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMemoryWithEvents.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.base.Stopwatch;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -116,6 +117,8 @@ public class TestMemoryWithEvents {
   }
   
   private void testMemory(DAG dag, boolean sendDMEvents) throws Exception {
+    Stopwatch stopwatch = new Stopwatch();
+    stopwatch.start();
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
 
     MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
@@ -133,7 +136,8 @@ public class TestMemoryWithEvents {
     DAGStatus status = dagClient.waitForCompletion();
     Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
     checkMemory(dag.getName(), mockApp);
-    
+    stopwatch.stop();
+    System.out.println("Time taken(ms): " + stopwatch.elapsedMillis());
     tezClient.stop();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/0efaae85/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 8b2a1b4..91465b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.runtime.api.VertexStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -5778,6 +5779,36 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause());
   }
 
+  @Test (timeout=5000)
+  public void testCompletedStatsCache() {
+    initAllVertices(VertexState.INITED);
+    VertexImpl v = vertices.get("vertex2");
+    startVertex(v);
+
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+
+    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    dispatcher.getEventHandler().handle(new TaskEventTAUpdate(TezTaskAttemptID.getInstance(t1, 0),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+
+
+    VertexStatistics stats = v.getStatistics();
+
+    //Ensure that task 0 is available in completed stats cache
+    Assert.assertTrue(v.completedTasksStatsCache.taskSet.get(0));
+
+    //Reschedule task 0
+    dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(t1));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+
+    //cache should be cleared
+    Assert.assertTrue(v.completedTasksStatsCache.taskSet.cardinality() == 0);
+  }
+
   @Test (timeout = 5000)
   public void testRouteEvent_RecoveredEvent() throws IOException {
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();