You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/11/02 13:00:27 UTC

svn commit: r1404933 - in /hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/org/apache/hadoop/hive/ql/hooks/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/

Author: namit
Date: Fri Nov  2 12:00:26 2012
New Revision: 1404933

URL: http://svn.apache.org/viewvc?rev=1404933&view=rev
Log:
HIVE-3570 Add/fix facility to collect operator specific statisticsin hive + add hash-in/hash-out
counter for GroupBy Optr (Satadru Pan via namit)


Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java
    hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q
    hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out
Modified:
    hive/trunk/build-common.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java

Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Fri Nov  2 12:00:26 2012
@@ -57,7 +57,7 @@
   <property name="test.output" value="true"/>
   <property name="test.junit.output.format" value="xml"/>
   <property name="test.junit.output.usefile" value="true"/>
-  <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q"/>
+  <property name="minimr.query.files" value="input16_cc.q,scriptfile1.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q"/>
   <property name="minimr.query.negative.files" value="cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q" />
   <property name="test.silent" value="true"/>
   <property name="hadoopVersion" value="${hadoop.version.ant-internal}"/>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Nov  2 12:00:26 2012
@@ -58,12 +58,12 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -78,10 +78,11 @@ public class GroupByOperator extends Ope
 
   private static final Log LOG = LogFactory.getLog(GroupByOperator.class
       .getName());
-
   private static final long serialVersionUID = 1L;
   private static final int NUMROWSESTIMATESIZE = 1000;
 
+  public static final String counterNameHashOut = "COUNT_HASH_OUT";
+
   protected transient ExprNodeEvaluator[] keyFields;
   protected transient ObjectInspector[] keyObjectInspectors;
 
@@ -1068,6 +1069,12 @@ public class GroupByOperator extends Ope
   public void closeOp(boolean abort) throws HiveException {
     if (!abort) {
       try {
+        // put the hash related stats in statsMap if applicable, so that they
+        // are sent to jt as counters
+        if (hashAggr) {
+          incrCounter(counterNameHashOut, numRowsHashTbl);
+        }
+
         // If there is no grouping key and no row came to this operator
         if (firstRow && (keyFields.length == 0)) {
           firstRow = false;
@@ -1127,6 +1134,13 @@ public class GroupByOperator extends Ope
     }
   }
 
+  @Override
+  protected List<String> getAdditionalCounters() {
+    List<String> ctrList = new ArrayList<String>();
+    ctrList.add(getWrappedCounterName(counterNameHashOut));
+    return ctrList;
+  }
+
   // Group by contains the columns needed - no need to aggregate from children
   public List<String> genColLists(
       HashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1404933&r1=1404932&r2=1404933&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Nov  2 12:00:26 2012
@@ -548,11 +548,9 @@ public abstract class Operator<T extends
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
 
-    if (counterNameToEnum != null) {
-      incrCounter(numInputRowsCntr, inputRows);
-      incrCounter(numOutputRowsCntr, outputRows);
-      incrCounter(timeTakenCntr, totalTime);
-    }
+    incrCounter(numInputRowsCntr, inputRows);
+    incrCounter(numOutputRowsCntr, outputRows);
+    incrCounter(timeTakenCntr, totalTime);
 
     LOG.info(id + " forwarded " + cntr + " rows");
 
@@ -766,10 +764,8 @@ public abstract class Operator<T extends
       throws HiveException {
 
     if ((++outputRows % 1000) == 0) {
-      if (counterNameToEnum != null) {
-        incrCounter(numOutputRowsCntr, outputRows);
-        outputRows = 0;
-      }
+      incrCounter(numOutputRowsCntr, outputRows);
+      outputRows = 0;
     }
 
     if (isLogInfoEnabled) {
@@ -1107,16 +1103,13 @@ public abstract class Operator<T extends
    */
   private void preProcessCounter() {
     inputRows++;
-
-    if (counterNameToEnum != null) {
-      if ((inputRows % 1000) == 0) {
-        incrCounter(numInputRowsCntr, inputRows);
-        incrCounter(timeTakenCntr, totalTime);
-        inputRows = 0;
-        totalTime = 0;
-      }
-      beginTime = System.currentTimeMillis();
+    if ((inputRows % 1000) == 0) {
+      incrCounter(numInputRowsCntr, inputRows);
+      incrCounter(timeTakenCntr, totalTime);
+      inputRows = 0;
+      totalTime = 0;
     }
+    beginTime = System.currentTimeMillis();
   }
 
   /**
@@ -1135,7 +1128,11 @@ public abstract class Operator<T extends
    * @param amount
    */
   protected void incrCounter(String name, long amount) {
-    String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
+    if(counterNameToEnum == null) {
+      return;
+    }
+
+    String counterName = getWrappedCounterName(name);
     ProgressCounter pc = counterNameToEnum.get(counterName);
 
     // Currently, we maintain fixed number of counters per plan - in case of a
@@ -1161,6 +1158,10 @@ public abstract class Operator<T extends
     return operatorId;
   }
 
+  public final String getWrappedCounterName(String ctrName) {
+    return String.format(counterNameFormat, getOperatorId(), ctrName);
+  }
+
   public void initOperatorId() {
     setOperatorId(getName() + "_" + this.id);
   }
@@ -1216,7 +1217,7 @@ public abstract class Operator<T extends
       return false;
     }
 
-    String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr;
+    String counterName = getWrappedCounterName(fatalErrorCntr);
     ProgressCounter pc = counterNameToEnum.get(counterName);
 
     // Currently, we maintain fixed number of counters per plan - in case of a
@@ -1292,14 +1293,16 @@ public abstract class Operator<T extends
   protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
   protected static String timeTakenCntr = "TIME_TAKEN";
   protected static String fatalErrorCntr = "FATAL_ERROR";
+  private static String counterNameFormat = "CNTR_NAME_%s_%s";
 
   public void initializeCounters() {
     initOperatorId();
     counterNames = new ArrayList<String>();
-    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numInputRowsCntr);
-    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + numOutputRowsCntr);
-    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + timeTakenCntr);
-    counterNames.add("CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr);
+    counterNames.add(getWrappedCounterName(numInputRowsCntr));
+    counterNames.add(getWrappedCounterName(numOutputRowsCntr));
+    counterNames.add(getWrappedCounterName(timeTakenCntr));
+    counterNames.add(getWrappedCounterName(fatalErrorCntr));
+    /* getAdditionalCounter should return Wrapped counters */
     List<String> newCntrs = getAdditionalCounters();
     if (newCntrs != null) {
       counterNames.addAll(newCntrs);
@@ -1308,9 +1311,11 @@ public abstract class Operator<T extends
 
   /*
    * By default, the list is empty - if an operator wants to add more counters,
-   * it should override this method and provide the new list.
+   * it should override this method and provide the new list. Counter names returned
+   * by this method should be wrapped counter names (i.e the strings should be passed
+   * through getWrappedCounterName).
    */
-  private List<String> getAdditionalCounters() {
+  protected List<String> getAdditionalCounters() {
     return null;
   }
 

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/OptrStatGroupByHook.java Fri Nov  2 12:00:26 2012
@@ -0,0 +1,109 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express  or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ **/
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+public class OptrStatGroupByHook implements ExecuteWithHookContext {
+
+  public void run(HookContext hookContext) {
+    HiveConf conf = hookContext.getConf();
+
+    List<TaskRunner> completedTasks = hookContext.getCompleteTaskList();
+
+    boolean enableProgress = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS);
+
+    /** For each task visit the opeartor tree and and if the operator is GROUPBY
+     *  then print the HASH_OUT Optr level stat value.
+     **/
+    if (completedTasks != null) {
+      for (TaskRunner taskRunner : completedTasks) {
+        Task<? extends Serializable> task = taskRunner.getTask();
+        if (task.isMapRedTask() && !task.isMapRedLocalTask()) {
+          Set<Operator<? extends OperatorDesc>> optrSet = getOptrsForTask(task);
+          for (Operator<? extends OperatorDesc> optr : optrSet) {
+            if (optr.getType() == OperatorType.GROUPBY) {
+               printCounterValue(optr.getCounters());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void printCounterValue(HashMap<String, Long> ctrs) {
+    for (String ctrName : ctrs.keySet()) {
+      if (ctrName.contains("HASH_OUT")) {
+        SessionState.getConsole().printError(ctrName+"="+ctrs.get(ctrName));
+      }
+    }
+  }
+
+  private Set<Operator<? extends OperatorDesc>> getOptrsForTask(
+    Task<? extends Serializable> task) {
+
+    Collection<Operator<? extends OperatorDesc>> topOptrs = task.getTopOperators();
+    Set<Operator<? extends OperatorDesc>> allOptrs =
+      new HashSet<Operator<? extends OperatorDesc>>();
+    Queue<Operator<? extends OperatorDesc>> opsToVisit =
+      new LinkedList<Operator<? extends OperatorDesc>>();
+    if(topOptrs != null) {
+      opsToVisit.addAll(topOptrs);
+      addChildOptrs(opsToVisit, allOptrs);
+    }
+
+    return allOptrs;
+  }
+
+  private void addChildOptrs(
+    Queue<Operator<? extends OperatorDesc>> opsToVisit,
+    Set<Operator<? extends OperatorDesc>> opsVisited) {
+
+    if(opsToVisit == null || opsVisited == null) {
+      return;
+    }
+
+    while (opsToVisit.peek() != null) {
+      Operator<? extends OperatorDesc> op = opsToVisit.remove();
+      opsVisited.add(op);
+      if (op.getChildOperators() != null) {
+        for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+          if (!opsVisited.contains(childOp)) {
+            opsToVisit.add(childOp);
+          }
+        }
+      }
+    }
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/optrstat_groupby.q Fri Nov  2 12:00:26 2012
@@ -0,0 +1,6 @@
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.OptrStatGroupByHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+-- This test executes the OptrStatGroupBy hook which prints the optr level
+-- stats of GROUPBY optr present is the plan of below query
+SELECT count(1) FROM src;

Added: hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out?rev=1404933&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/optrstat_groupby.q.out Fri Nov  2 12:00:26 2012
@@ -0,0 +1,8 @@
+PREHOOK: query: -- This test executes the OptrStatGroupBy hook which prints the optr level
+-- stats of GROUPBY optr present is the plan of below query
+SELECT count(1) FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+CNTR_NAME_GBY_2_COUNT_HASH_OUT=1
+500