You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by cw...@apache.org on 2012/10/01 06:28:07 UTC
svn commit: r1392202 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
QueryPlan.java exec/HadoopJobExecHelper.java plan/ReducerTimeStatsPerJob.java
Author: cws
Date: Mon Oct 1 04:28:06 2012
New Revision: 1392202
URL: http://svn.apache.org/viewvc?rev=1392202&view=rev
Log:
add instrumentation to capture if there is skew in reducers (Arun Dobriya via cws)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (with props)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1392202&r1=1392201&r2=1392202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Mon Oct 1 04:28:06 2012
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.hooks.R
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.plan.api.AdjacencyType;
import org.apache.hadoop.hive.ql.plan.api.NodeType;
import org.apache.hadoop.hive.ql.plan.api.TaskType;
@@ -67,6 +68,7 @@ public class QueryPlan implements Serial
private ArrayList<Task<? extends Serializable>> rootTasks;
private FetchTask fetchTask;
+ private final List<ReducerTimeStatsPerJob> reducerTimeStatsPerJobList;
private HashSet<ReadEntity> inputs;
/**
@@ -94,12 +96,14 @@ public class QueryPlan implements Serial
private transient Long queryStartTime;
public QueryPlan() {
+ this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>();
+ this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
rootTasks.addAll(sem.getRootTasks());
fetchTask = sem.getFetchTask();
// Note that inputs and outputs can be changed when the query gets executed
@@ -706,6 +710,10 @@ public class QueryPlan implements Serial
return query;
}
+ public List<ReducerTimeStatsPerJob> getReducerTimeStatsPerJobList() {
+ return this.reducerTimeStatsPerJobList;
+ }
+
public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) {
this.query = query;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1392202&r1=1392201&r2=1392202&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Oct 1 04:28:06 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher;
@@ -696,6 +697,12 @@ public class HadoopJobExecHelper {
// for special modes. In that case, SessionState.get() is empty.
if (SessionState.get() != null) {
SessionState.get().getLastMapRedStatsList().add(mapRedStats);
+
+ // Computes the skew for all the MapReduce irrespective
+ // of Success or Failure
+ if (this.task.getQueryPlan() != null) {
+ computeReducerTimeStatsPerJob(rj);
+ }
}
boolean success = mapRedStats.isSuccess();
@@ -733,6 +740,31 @@ public class HadoopJobExecHelper {
return returnVal;
}
+
+ private void computeReducerTimeStatsPerJob(RunningJob rj) throws IOException {
+ TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0);
+ List<Integer> reducersRunTimes = new ArrayList<Integer>();
+
+ for (TaskCompletionEvent taskCompletion : taskCompletions) {
+ String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion);
+ if (taskJobIds == null) {
+ // Task attempt info is unavailable in this Hadoop version");
+ continue;
+ }
+ String taskId = taskJobIds[0];
+ if (!taskCompletion.isMapTask()) {
+ reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime()));
+ }
+ }
+ // Compute the reducers run time statistics for the job
+ ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes,
+ new String(this.jobId));
+ // Adding the reducers run time statistics for the job in the QueryPlan
+ this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob);
+ return;
+ }
+
+
private Map<String, Double> extractAllCounterValues(Counters counters) {
Map<String, Double> exctractedCounters = new HashMap<String, Double>();
for (Counters.Group cg : counters) {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1392202&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java Mon Oct 1 04:28:06 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+
+/*
+ * Encapsulates statistics about the duration of all reduce tasks
+ * corresponding to a specific JobId.
+ * The stats are computed in the HadoopJobExecHelper when the
+ * job completes and then populated inside the QueryPlan for
+ * each job, from where it can be later on accessed.
+ * The reducer statistics consist of minimum/maximum/mean/stdv of the
+ * run times of all the reduce tasks for a job. All the Run times are
+ * in Milliseconds.
+ */
+public class ReducerTimeStatsPerJob {
+
+ // stores the JobId of the job
+ private final String jobId;
+
+ // Stores the temporal statistics in milliseconds for reducers
+ // specific to a Job
+ private final long minimumTime;
+ private final long maximumTime;
+ private final double meanTime;
+ private final double standardDeviationTime;
+
+
+ /*
+ * Computes the temporal run time statistics of the reducers
+ * for a specific JobId.
+ */
+ public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes, String jobId) {
+ this.jobId = jobId;
+
+ // If no Run times present, then set -1, indicating no values
+ if (!reducersRunTimes.isEmpty()) {
+ long minimumTime = reducersRunTimes.get(0);
+ long maximumTime = reducersRunTimes.get(0);
+ long totalTime = reducersRunTimes.get(0);
+ double standardDeviationTime = 0.0;
+ double meanTime = 0.0;
+
+ for (int i = 1; i < reducersRunTimes.size(); i++) {
+ if (reducersRunTimes.get(i) < minimumTime) {
+ minimumTime = reducersRunTimes.get(i);
+ }
+ if (reducersRunTimes.get(i) > maximumTime) {
+ maximumTime = reducersRunTimes.get(i);
+ }
+ totalTime += reducersRunTimes.get(i);
+ }
+ meanTime = (double) totalTime / reducersRunTimes.size();
+
+ for (int i = 0; i < reducersRunTimes.size(); i++) {
+ standardDeviationTime += Math.pow(meanTime - reducersRunTimes.get(i), 2);
+ }
+ standardDeviationTime /= reducersRunTimes.size();
+ standardDeviationTime = Math.sqrt(standardDeviationTime);
+
+ this.minimumTime = minimumTime;
+ this.maximumTime = maximumTime;
+ this.meanTime = meanTime;
+ this.standardDeviationTime = standardDeviationTime;
+ return;
+ }
+ this.minimumTime = -1;
+ this.maximumTime = -1;
+ this.meanTime = -1.0;
+ this.standardDeviationTime = -1.0;
+ return;
+ }
+
+ public long getMinimumTime() {
+ return this.minimumTime;
+ }
+
+ public long getMaximumTime() {
+ return this.maximumTime;
+ }
+
+ public double getMeanTime() {
+ return this.meanTime;
+ }
+
+ public double getStandardDeviationTime() {
+ return this.standardDeviationTime;
+ }
+
+ public String getJobId() {
+ return this.jobId;
+ }
+
+}
Propchange: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
------------------------------------------------------------------------------
svn:eol-style = native