You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2012/10/01 06:28:07 UTC

svn commit: r1392202 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: QueryPlan.java exec/HadoopJobExecHelper.java plan/ReducerTimeStatsPerJob.java

Author: cws
Date: Mon Oct  1 04:28:06 2012
New Revision: 1392202

URL: http://svn.apache.org/viewvc?rev=1392202&view=rev
Log:
add instrumentation to capture if there is skew in reducers (Arun Dobriya via cws)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java   (with props)
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1392202&r1=1392201&r2=1392202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Mon Oct  1 04:28:06 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.hooks.R
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
 import org.apache.hadoop.hive.ql.plan.api.NodeType;
 import org.apache.hadoop.hive.ql.plan.api.TaskType;
@@ -67,6 +68,7 @@ public class QueryPlan implements Serial
 
   private ArrayList<Task<? extends Serializable>> rootTasks;
   private FetchTask fetchTask;
+  private final List<ReducerTimeStatsPerJob> reducerTimeStatsPerJobList;
 
   private HashSet<ReadEntity> inputs;
   /**
@@ -94,12 +96,14 @@ public class QueryPlan implements Serial
   private transient Long queryStartTime;
 
   public QueryPlan() {
+    this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime) {
     this.queryString = queryString;
 
     rootTasks = new ArrayList<Task<? extends Serializable>>();
+    this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
     rootTasks.addAll(sem.getRootTasks());
     fetchTask = sem.getFetchTask();
     // Note that inputs and outputs can be changed when the query gets executed
@@ -706,6 +710,10 @@ public class QueryPlan implements Serial
     return query;
   }
 
+  public List<ReducerTimeStatsPerJob> getReducerTimeStatsPerJobList() {
+    return this.reducerTimeStatsPerJobList;
+  }
+
   public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) {
     this.query = query;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1392202&r1=1392201&r2=1392202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Oct  1 04:28:06 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
 import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher;
@@ -696,6 +697,12 @@ public class HadoopJobExecHelper {
     // for special modes. In that case, SessionState.get() is empty.
     if (SessionState.get() != null) {
       SessionState.get().getLastMapRedStatsList().add(mapRedStats);
+
+      // Computes the skew for all the MapReduce irrespective
+      // of Success or Failure
+      if (this.task.getQueryPlan() != null) {
+        computeReducerTimeStatsPerJob(rj);
+      }
     }
 
     boolean success = mapRedStats.isSuccess();
@@ -733,6 +740,31 @@ public class HadoopJobExecHelper {
     return returnVal;
   }
 
+
+  private void computeReducerTimeStatsPerJob(RunningJob rj) throws IOException {
+    TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0);
+    List<Integer> reducersRunTimes = new ArrayList<Integer>();
+
+    for (TaskCompletionEvent taskCompletion : taskCompletions) {
+      String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion);
+      if (taskJobIds == null) {
+        // Task attempt info is unavailable in this Hadoop version");
+        continue;
+      }
+      String taskId = taskJobIds[0];
+      if (!taskCompletion.isMapTask()) {
+        reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime()));
+      }
+    }
+    // Compute the reducers run time statistics for the job
+    ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes,
+        new String(this.jobId));
+    // Adding the reducers run time statistics for the job in the QueryPlan
+    this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob);
+    return;
+  }
+
+
   private Map<String, Double> extractAllCounterValues(Counters counters) {
     Map<String, Double> exctractedCounters = new HashMap<String, Double>();
     for (Counters.Group cg : counters) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1392202&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java Mon Oct  1 04:28:06 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+
+/*
+ * Encapsulates statistics about the duration of all reduce tasks
+ * corresponding to a specific JobId.
+ * The stats are computed in the HadoopJobExecHelper when the
+ * job completes and then populated inside the QueryPlan for
+ * each job, from where it can be later on accessed.
+ * The reducer statistics consist of minimum/maximum/mean/stdv of the
+ * run times of all the reduce tasks for a job. All the Run times are
+ * in Milliseconds.
+ */
+public class ReducerTimeStatsPerJob {
+
+  // stores the JobId of the job
+  private final String jobId;
+
+  // Stores the temporal statistics in milliseconds for reducers
+  // specific to a Job
+  private final long minimumTime;
+  private final long maximumTime;
+  private final double meanTime;
+  private final double standardDeviationTime;
+
+
+  /*
+   * Computes the temporal run time statistics of the reducers
+   * for a specific JobId.
+   */
+  public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes, String jobId) {
+    this.jobId = jobId;
+
+    // If no Run times present, then set -1, indicating no values
+    if (!reducersRunTimes.isEmpty()) {
+      long minimumTime = reducersRunTimes.get(0);
+      long maximumTime = reducersRunTimes.get(0);
+      long totalTime = reducersRunTimes.get(0);
+      double standardDeviationTime = 0.0;
+      double meanTime = 0.0;
+
+      for (int i = 1; i < reducersRunTimes.size(); i++) {
+        if (reducersRunTimes.get(i) < minimumTime) {
+          minimumTime = reducersRunTimes.get(i);
+        }
+        if (reducersRunTimes.get(i) > maximumTime) {
+          maximumTime = reducersRunTimes.get(i);
+        }
+        totalTime += reducersRunTimes.get(i);
+      }
+      meanTime = (double) totalTime / reducersRunTimes.size();
+
+      for (int i = 0; i < reducersRunTimes.size(); i++) {
+        standardDeviationTime += Math.pow(meanTime - reducersRunTimes.get(i), 2);
+      }
+      standardDeviationTime /= reducersRunTimes.size();
+      standardDeviationTime = Math.sqrt(standardDeviationTime);
+
+      this.minimumTime = minimumTime;
+      this.maximumTime = maximumTime;
+      this.meanTime = meanTime;
+      this.standardDeviationTime = standardDeviationTime;
+      return;
+    }
+    this.minimumTime = -1;
+    this.maximumTime = -1;
+    this.meanTime = -1.0;
+    this.standardDeviationTime = -1.0;
+    return;
+  }
+
+  public long getMinimumTime() {
+    return this.minimumTime;
+  }
+
+  public long getMaximumTime() {
+    return this.maximumTime;
+  }
+
+  public double getMeanTime() {
+    return this.meanTime;
+  }
+
+  public double getStandardDeviationTime() {
+    return this.standardDeviationTime;
+  }
+
+  public String getJobId() {
+    return this.jobId;
+  }
+
+}

Propchange: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
------------------------------------------------------------------------------
    svn:eol-style = native