You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/18 02:41:56 UTC

svn commit: r1632714 - in /hive/branches/branch-0.14/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/exec/vector/ test/org/apache/hadoop/hive/q...

Author: gunther
Date: Sat Oct 18 00:41:55 2014
New Revision: 1632714

URL: http://svn.apache.org/r1632714
Log:
HIVE-8429: Add records in/out counters (Gunther Hagleitner, reviewed by Gopal V)

Modified:
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
    hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
    hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Oct 18 00:41:55 2014
@@ -109,6 +109,15 @@ public class FileSinkOperator extends Te
   private StructField bucketField; // field bucket is in in record id
   private StructObjectInspector recIdInspector; // OI for inspecting record id
   private IntObjectInspector bucketInspector; // OI for inspecting bucket id
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+
+  /**
+   * Counters.
+   */
+  public static enum Counter {
+    RECORDS_OUT
+  }
 
   /**
    * RecordWriter.
@@ -249,7 +258,7 @@ public class FileSinkOperator extends Te
   private static final long serialVersionUID = 1L;
   protected transient FileSystem fs;
   protected transient Serializer serializer;
-  protected transient LongWritable row_count;
+  protected final transient LongWritable row_count = new LongWritable();
   private transient boolean isNativeTable = true;
 
   /**
@@ -352,7 +361,7 @@ public class FileSinkOperator extends Te
         prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
             jc.getPartitionerClass(), null);
       }
-      row_count = new LongWritable();
+
       if (dpCtx != null) {
         dpSetup();
       }
@@ -381,6 +390,13 @@ public class FileSinkOperator extends Te
         bucketField = recIdInspector.getAllStructFieldRefs().get(1);
         bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
       }
+
+      String context = jc.get(Operator.CONTEXT_NAME_KEY, "");
+      if (context != null && !context.isEmpty()) {
+        context = "_" + context.replace(" ","_");
+      }
+      statsMap.put(Counter.RECORDS_OUT + context, row_count);
+
       initializeChildren(hconf);
     } catch (HiveException e) {
       throw e;
@@ -657,9 +673,9 @@ public class FileSinkOperator extends Te
         fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
       }
 
-
-      if (row_count != null) {
-        row_count.set(row_count.get() + 1);
+      if (++numRows == cntr) {
+        cntr *= 10;
+        LOG.info(toString() + ": records written - " + numRows);
       }
 
       int writerOffset = findWriterOffset(row);
@@ -921,6 +937,9 @@ public class FileSinkOperator extends Te
   @Override
   public void closeOp(boolean abort) throws HiveException {
 
+    row_count.set(numRows);
+    LOG.info(toString() + ": records written - " + numRows);    
+
     if (!bDynParts && !filesCreated) {
       createBucketFiles(fsp);
     }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Sat Oct 18 00:41:55 2014
@@ -37,29 +37,14 @@ public class FilterOperator extends Oper
     Serializable {
 
   private static final long serialVersionUID = 1L;
-
-  /**
-   * Counter.
-   *
-   */
-  public static enum Counter {
-    FILTERED, PASSED
-  }
-
-  protected final transient LongWritable filtered_count;
-  protected final transient LongWritable passed_count;
   private transient ExprNodeEvaluator conditionEvaluator;
   private transient PrimitiveObjectInspector conditionInspector;
-  private transient int consecutiveFails;
   private transient int consecutiveSearches;
   private transient IOContext ioContext;
   protected transient int heartbeatInterval;
 
   public FilterOperator() {
     super();
-    filtered_count = new LongWritable();
-    passed_count = new LongWritable();
-    consecutiveFails = 0;
     consecutiveSearches = 0;
   }
 
@@ -73,8 +58,6 @@ public class FilterOperator extends Oper
         conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator);
       }
 
-      statsMap.put(Counter.FILTERED, filtered_count);
-      statsMap.put(Counter.PASSED, passed_count);
       conditionInspector = null;
       ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
     } catch (Throwable e) {
@@ -135,17 +118,6 @@ public class FilterOperator extends Oper
         .getPrimitiveJavaObject(condition);
     if (Boolean.TRUE.equals(ret)) {
       forward(row, rowInspector);
-      passed_count.set(passed_count.get() + 1);
-      consecutiveFails = 0;
-    } else {
-      filtered_count.set(filtered_count.get() + 1);
-      consecutiveFails++;
-
-      // In case of a lot of consecutive failures, send a heartbeat in order to
-      // avoid timeout
-      if (((consecutiveFails % heartbeatInterval) == 0) && (reporter != null)) {
-        reporter.progress();
-      }
     }
   }
 

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Sat Oct 18 00:41:55 2014
@@ -63,7 +63,7 @@ public class JoinOperator extends Common
       skewJoinKeyContext.initiliaze(hconf);
       skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
     }
-    statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs);
+    statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs);
   }
 
   @Override

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Sat Oct 18 00:41:55 2014
@@ -75,10 +75,16 @@ public class MapOperator extends Operato
    *
    */
   public static enum Counter {
-    DESERIALIZE_ERRORS
+    DESERIALIZE_ERRORS,
+    RECORDS_IN
   }
 
   private final transient LongWritable deserialize_error_count = new LongWritable();
+  private final transient LongWritable recordCounter = new LongWritable();
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+  protected final boolean isInfoEnabled = LOG.isInfoEnabled();
+  protected final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private final Map<MapInputPath, MapOpCtx> opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
   private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
@@ -362,7 +368,7 @@ public class MapOperator extends Operato
 
         for (String onealias : aliases) {
           Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
-          if (LOG.isDebugEnabled()) {
+          if (isDebugEnabled) {
             LOG.debug("Adding alias " + onealias + " to work list for file "
                + onefile);
           }
@@ -380,8 +386,10 @@ public class MapOperator extends Operato
           if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
             children.add(op);
             childrenOpToOpCtxMap.put(op, opCtx);
-            LOG.info("dump " + op + " "
+            if (isInfoEnabled) {
+              LOG.info("dump " + op + " "
                 + opCtxMap.get(inp).rowObjectInspector.getTypeName());
+            }
           }
           current = opCtx;  // just need for TestOperators.testMapOperator
         }
@@ -406,7 +414,13 @@ public class MapOperator extends Operato
   public void initializeOp(Configuration hconf) throws HiveException {
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
-    statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
+    statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+
+    String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    if (context != null && !context.isEmpty()) {
+      context = "_" + context.replace(" ","_");
+    }
+    statsMap.put(Counter.RECORDS_IN + context, recordCounter);
 
     List<Operator<? extends OperatorDesc>> children = getChildOperators();
 
@@ -451,6 +465,7 @@ public class MapOperator extends Operato
         op.close(abort);
       }
     }
+    recordCounter.set(numRows);
   }
 
   // Find context for current input file
@@ -473,7 +488,9 @@ public class MapOperator extends Operato
         MapOpCtx context = opCtxMap.get(inp);
         if (context != null) {
           current = context;
-          LOG.info("Processing alias " + onealias + " for file " + onefile);
+          if (isInfoEnabled) {
+            LOG.info("Processing alias " + onealias + " for file " + onefile);
+          }
           return;
         }
       }
@@ -533,6 +550,13 @@ public class MapOperator extends Operato
     // The row has been converted to comply with table schema, irrespective of partition schema.
     // So, use tblOI (and not partOI) for forwarding
     try {
+      numRows++;
+      if (isInfoEnabled) {
+        if (numRows == cntr) {
+          cntr *= 10;
+          LOG.info(toString() + ": records read - " + numRows);
+        }
+      }
       forward(row, current.rowObjectInspector);
     } catch (Exception e) {
       // Serialize the row and output the error message.

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sat Oct 18 00:41:55 2014
@@ -61,6 +61,7 @@ public abstract class Operator<T extends
 
   public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
   public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+  public static final String CONTEXT_NAME_KEY = "__hive.context.name";
 
   private transient Configuration configuration;
   protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -210,7 +211,7 @@ public abstract class Operator<T extends
 
   // non-bean ..
 
-  protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+  protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
   @SuppressWarnings("rawtypes")
   protected transient OutputCollector out;
   protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
@@ -287,9 +288,9 @@ public abstract class Operator<T extends
     }
   }
 
-  public Map<Enum<?>, Long> getStats() {
-    HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
-    for (Enum<?> one : statsMap.keySet()) {
+  public Map<String, Long> getStats() {
+    HashMap<String, Long> ret = new HashMap<String, Long>();
+    for (String one : statsMap.keySet()) {
       ret.put(one, Long.valueOf(statsMap.get(one).get()));
     }
     return (ret);
@@ -807,7 +808,7 @@ public abstract class Operator<T extends
   }
 
   public void resetStats() {
-    for (Enum<?> e : statsMap.keySet()) {
+    for (String e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
     }
   }
@@ -840,7 +841,7 @@ public abstract class Operator<T extends
   }
 
   public void logStats() {
-    for (Enum<?> e : statsMap.keySet()) {
+    for (String e : statsMap.keySet()) {
       LOG.info(e.toString() + ":" + statsMap.get(e).toString());
     }
   }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Oct 18 00:41:55 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.hash.MurmurHash;
@@ -67,6 +68,13 @@ public class ReduceSinkOperator extends 
     PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
   }
 
+  /**
+   * Counters.
+   */
+  public static enum Counter {
+    RECORDS_OUT_INTERMEDIATE
+  }
+
   private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -146,9 +154,19 @@ public class ReduceSinkOperator extends 
   private StructObjectInspector recIdInspector; // OI for the record identifier
   private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
 
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+  private final transient LongWritable recordCounter = new LongWritable();
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
+      String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+      if (context != null && !context.isEmpty()) {
+        context = "_" + context.replace(" ","_");
+      }
+      statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
       List<ExprNodeDesc> keys = conf.getKeyCols();
 
       if (isDebugEnabled) {
@@ -508,6 +526,13 @@ public class ReduceSinkOperator extends 
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
     if (null != out) {
+      numRows++;
+      if (isInfoEnabled) {
+        if (numRows == cntr) {
+          cntr *= 10;
+          LOG.info(toString() + ": records written - " + numRows);
+        }
+      }
       out.collect(keyWritable, valueWritable);
     }
   }
@@ -537,6 +562,10 @@ public class ReduceSinkOperator extends 
     }
     super.closeOp(abort);
     out = null;
+    if (isInfoEnabled) {
+      LOG.info(toString() + ": records written - " + numRows);
+    }
+    recordCounter.set(numRows);
   }
 
   /**

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Sat Oct 18 00:41:55 2014
@@ -236,8 +236,8 @@ public class ScriptOperator extends Oper
   protected void initializeOp(Configuration hconf) throws HiveException {
     firstRow = true;
 
-    statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
-    statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count);
+    statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+    statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count);
 
     try {
       this.hconf = hconf;

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Sat Oct 18 00:41:55 2014
@@ -26,8 +26,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -72,9 +75,6 @@ public class ExecMapper extends MapReduc
   private static boolean done;
 
   // used to log memory usage periodically
-  public static MemoryMXBean memoryMXBean;
-  private long numRows = 0;
-  private long nextCntr = 1;
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
@@ -84,8 +84,6 @@ public class ExecMapper extends MapReduc
   public void configure(JobConf job) {
     execContext = new ExecMapperContext(job);
     // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
     isLogInfoEnabled = l4j.isInfoEnabled();
 
@@ -176,15 +174,6 @@ public class ExecMapper extends MapReduc
         // Since there is no concept of a group, we don't invoke
         // startGroup/endGroup for a mapper
         mo.process((Writable)value);
-        if (isLogInfoEnabled) {
-          numRows++;
-          if (numRows == nextCntr) {
-            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecMapper: processing " + numRows
-                + " rows: used memory = " + used_memory);
-            nextCntr = getNextCntr(numRows);
-          }
-        }
       }
     } catch (Throwable e) {
       abort = true;
@@ -198,18 +187,6 @@ public class ExecMapper extends MapReduc
     }
   }
 
-
-  private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   @Override
   public void close() {
     // No row was processed
@@ -245,13 +222,7 @@ public class ExecMapper extends MapReduc
         }
       }
 
-      if (isLogInfoEnabled) {
-        long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
-            + used_memory);
-      }
-
-      ReportStats rps = new ReportStats(rp);
+      ReportStats rps = new ReportStats(rp, jc);
       mo.preorderMap(rps);
       return;
     } catch (Exception e) {
@@ -288,17 +259,21 @@ public class ExecMapper extends MapReduc
    */
   public static class ReportStats implements Operator.OperatorFunc {
     private final Reporter rp;
+    private final Configuration conf;
+    private final String groupName;
 
-    public ReportStats(Reporter rp) {
+    public ReportStats(Reporter rp, Configuration conf) {
       this.rp = rp;
+      this.conf = conf;
+      this.groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
     }
 
     @Override
     public void func(Operator op) {
-      Map<Enum<?>, Long> opStats = op.getStats();
-      for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
+      Map<String, Long> opStats = op.getStats();
+      for (Map.Entry<String, Long> e : opStats.entrySet()) {
         if (rp != null) {
-          rp.incrCounter(e.getKey(), e.getValue());
+          rp.incrCounter(groupName, e.getKey(), e.getValue());
         }
       }
     }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sat Oct 18 00:41:55 2014
@@ -70,8 +70,6 @@ public class ExecReducer extends MapRedu
   private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
-  // used to log memory usage periodically
-  private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
   // Input value serde needs to be an array to support different SerDe
   // for different tags
   private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
@@ -86,8 +84,6 @@ public class ExecReducer extends MapRedu
   private Reporter rp;
   private boolean abort = false;
   private boolean isTagged = false;
-  private long cntr = 0;
-  private long nextCntr = 1;
   private TableDesc keyTableDesc;
   private TableDesc[] valueTableDesc;
   private ObjectInspector[] rowObjectInspector;
@@ -103,8 +99,6 @@ public class ExecReducer extends MapRedu
     ObjectInspector keyObjectInspector;
 
     if (isInfoEnabled) {
-      LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
       try {
         LOG.info("conf classpath = "
             + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
@@ -245,17 +239,7 @@ public class ExecReducer extends MapRedu
         row.clear();
         row.add(keyObject);
         row.add(valueObject[tag]);
-        if (isInfoEnabled) {
-          cntr++;
-          if (cntr == nextCntr) {
-            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            if (isInfoEnabled) {
-              LOG.info("ExecReducer: processing " + cntr
-                  + " rows: used memory = " + used_memory);
-            }
-            nextCntr = getNextCntr(cntr);
-          }
-        }
+
         try {
           reducer.processOp(row, tag);
         } catch (Exception e) {
@@ -283,17 +267,6 @@ public class ExecReducer extends MapRedu
     }
   }
 
-  private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   @Override
   public void close() {
 
@@ -310,13 +283,9 @@ public class ExecReducer extends MapRedu
         }
         reducer.endGroup();
       }
-      if (isInfoEnabled) {
-        LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
-            + memoryMXBean.getHeapMemoryUsage().getUsed());
-      }
 
       reducer.close(abort);
-      ReportStats rps = new ReportStats(rp);
+      ReportStats rps = new ReportStats(rp, jc);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Oct 18 00:41:55 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -178,6 +179,8 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) {
     JobConf conf = new JobConf(baseConf);
 
+    conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName());
+
     if (mapWork.getNumMapTasks() != null) {
       // Is this required ?
       conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
@@ -653,6 +656,8 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
+    conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName());
+
     // Is this required ?
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
 

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Sat Oct 18 00:41:55 2014
@@ -269,11 +269,7 @@ public class MapRecordProcessor extends 
   @Override
   void run() throws Exception {
 
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    }
+    while (sources[position].pushRecord()) {}
   }
 
   @Override
@@ -305,10 +301,7 @@ public class MapRecordProcessor extends 
         }
       }
 
-      if (isLogInfoEnabled) {
-        logCloseInfo();
-      }
-      ReportStats rps = new ReportStats(reporter);
+      ReportStats rps = new ReportStats(reporter, jconf);
       mapOp.preorderMap(rps);
       return;
     } catch (Exception e) {

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Sat Oct 18 00:41:55 2014
@@ -156,10 +156,7 @@ public class MergeFileRecordProcessor ex
       }
       mergeOp.close(abort);
 
-      if (isLogInfoEnabled) {
-        logCloseInfo();
-      }
-      ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter);
+      ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf);
       mergeOp.preorderMap(rps);
     } catch (Exception e) {
       if (!abort) {
@@ -190,9 +187,6 @@ public class MergeFileRecordProcessor ex
         row[0] = key;
         row[1] = value;
         mergeOp.processOp(row, 0);
-        if (isLogInfoEnabled) {
-          logProgress();
-        }
       }
     } catch (Throwable e) {
       abort = true;

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Sat Oct 18 00:41:55 2014
@@ -52,13 +52,10 @@ public abstract class RecordProcessor  {
 
 
   // used to log memory usage periodically
-  public static MemoryMXBean memoryMXBean;
   protected boolean isLogInfoEnabled = false;
   protected boolean isLogTraceEnabled = false;
   protected MRTaskReporter reporter;
 
-  private long numRows = 0;
-  private long nextUpdateCntr = 1;
   protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
   protected String CLASS_NAME = RecordProcessor.class.getName();
 
@@ -79,11 +76,6 @@ public abstract class RecordProcessor  {
     this.outputs = outputs;
     this.processorContext = processorContext;
 
-    // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
     isLogInfoEnabled = l4j.isInfoEnabled();
     isLogTraceEnabled = l4j.isTraceEnabled();
 
@@ -110,37 +102,6 @@ public abstract class RecordProcessor  {
 
   abstract void close();
 
-  /**
-   * Log information to be logged at the end
-   */
-  protected void logCloseInfo() {
-    long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
-  }
-
-  /**
-   * Log number of records processed and memory used after processing many records
-   */
-  protected void logProgress() {
-    numRows++;
-    if (numRows == nextUpdateCntr) {
-      long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-      l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
-      nextUpdateCntr = getNextUpdateRecordCounter(numRows);
-    }
-  }
-
-  private long getNextUpdateRecordCounter(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   protected void createOutputMap() {
     Preconditions.checkState(outMap == null, "Outputs should only be setup once");
     outMap = Maps.newHashMap();

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Sat Oct 18 00:41:55 2014
@@ -165,11 +165,7 @@ public class ReduceRecordProcessor  exte
     }
 
     // run the operator pipeline
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    }
+    while (sources[position].pushRecord()) {}
   }
 
   /**
@@ -208,7 +204,7 @@ public class ReduceRecordProcessor  exte
           dummyOp.close(abort);
         }
       }
-      ReportStats rps = new ReportStats(reporter);
+      ReportStats rps = new ReportStats(reporter, jconf);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Sat Oct 18 00:41:55 2014
@@ -60,8 +60,6 @@ public class VectorFilterOperator extend
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
-      statsMap.put(Counter.FILTERED, filtered_count);
-      statsMap.put(Counter.PASSED, passed_count);
     } catch (Throwable e) {
       throw new HiveException(e);
     }

Modified: hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Sat Oct 18 00:41:55 2014
@@ -40,6 +40,13 @@ public class VectorMapOperator extends M
     // The row has been converted to comply with table schema, irrespective of partition schema.
     // So, use tblOI (and not partOI) for forwarding
     try {
+      if (isInfoEnabled) {
+        numRows += ((VectorizedRowBatch)value).size;
+        while (numRows > cntr) {
+          cntr *= 10;
+          LOG.info(toString() + ": records read - " + numRows);
+        }
+      }
       forward(value, current.getRowObjectInspector());
     } catch (Exception e) {
       throw new HiveException("Hive Runtime Error while processing row ", e);

Modified: hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1632714&r1=1632713&r2=1632714&view=diff
==============================================================================
--- hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Sat Oct 18 00:41:55 2014
@@ -109,55 +109,6 @@ public class TestOperators extends TestC
     }
   }
 
-  public void testBaseFilterOperator() throws Throwable {
-    try {
-      System.out.println("Testing Filter Operator");
-      ExprNodeDesc col0 = TestExecDriver.getStringColumn("col0");
-      ExprNodeDesc col1 = TestExecDriver.getStringColumn("col1");
-      ExprNodeDesc col2 = TestExecDriver.getStringColumn("col2");
-      ExprNodeDesc zero = new ExprNodeConstantDesc("0");
-      ExprNodeDesc func1 = TypeCheckProcFactory.DefaultExprProcessor
-          .getFuncExprNodeDesc(">", col2, col1);
-      ExprNodeDesc func2 = TypeCheckProcFactory.DefaultExprProcessor
-          .getFuncExprNodeDesc("==", col0, zero);
-      ExprNodeDesc func3 = TypeCheckProcFactory.DefaultExprProcessor
-          .getFuncExprNodeDesc("and", func1, func2);
-      assert (func3 != null);
-      FilterDesc filterCtx = new FilterDesc(func3, false);
-
-      // Configuration
-      Operator<FilterDesc> op = OperatorFactory.get(FilterDesc.class);
-      op.setConf(filterCtx);
-
-      // runtime initialization
-      op.initialize(new JobConf(TestOperators.class),
-          new ObjectInspector[] {r[0].oi});
-
-      for (InspectableObject oner : r) {
-        op.processOp(oner.o, 0);
-      }
-
-      Map<Enum<?>, Long> results = op.getStats();
-      System.out.println("filtered = "
-          + results.get(FilterOperator.Counter.FILTERED));
-      assertEquals(Long.valueOf(4), results
-          .get(FilterOperator.Counter.FILTERED));
-      System.out.println("passed = "
-          + results.get(FilterOperator.Counter.PASSED));
-      assertEquals(Long.valueOf(1), results.get(FilterOperator.Counter.PASSED));
-
-      /*
-       * for(Enum e: results.keySet()) { System.out.println(e.toString() + ":" +
-       * results.get(e)); }
-       */
-      System.out.println("Filter Operator ok");
-
-    } catch (Throwable e) {
-      e.printStackTrace();
-      throw e;
-    }
-  }
-
   private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) {
     Configuration conf = new JobConf(TestOperators.class);
     for (String one: taskIds) {