You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/04/23 23:56:11 UTC

svn commit: r768070 - in /hadoop/hive/branches/branch-0.3: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java

Author: zshao
Date: Thu Apr 23 21:56:11 2009
New Revision: 768070

URL: http://svn.apache.org/viewvc?rev=768070&view=rev
Log:
HIVE-440. Reducer and Join to print out number of rows processed. (Namit Jain via zshao)

Modified:
    hadoop/hive/branches/branch-0.3/CHANGES.txt
    hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
    hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java

Modified: hadoop/hive/branches/branch-0.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/CHANGES.txt?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.3/CHANGES.txt Thu Apr 23 21:56:11 2009
@@ -41,6 +41,9 @@
     HIVE-427. Add missing config parameters in hive-default.xml.
     (Namit Jain via zshao)
 
+    HIVE-440. Reducer and Join to print out number of rows processed.
+    (Namit Jain via zshao)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Thu Apr 23 21:56:11 2009
@@ -47,6 +47,8 @@
   private Reporter rp;
   private boolean abort = false;
   private boolean isTagged = false;
+  private long cntr = 0;
+  private long nextCntr = 1;
 
   private static String [] fieldNames;
   public static final Log l4j = LogFactory.getLog("ExecReducer");
@@ -161,6 +163,10 @@
         row.add(keyObject);
         row.add(valueObject[tag]);
         row.add(tag);
+        if (cntr == nextCntr) {
+          l4j.info("ExecReducer: processing " + cntr + " rows");
+          nextCntr = getNextCntr(cntr);
+        }
         reducer.process(row, rowObjectInspector[tag]);
       }
 
@@ -170,6 +176,15 @@
     }
   }
 
+  private long getNextCntr(long cntr) {
+    // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+    // every 1 million times, and quickly before that
+    if (cntr >= 1000000)
+      return cntr + 1000000;
+    
+    return 10 * cntr;
+  }
+
   public void close() {
 
     // No row was processed
@@ -191,6 +206,7 @@
         l4j.trace("End Group");
         reducer.endGroup();
       }
+      l4j.info("ExecReducer: processed " + cntr + " rows");
       reducer.close(abort);
       reportStats rps = new reportStats (rp);
       reducer.preorderMap(rps);

Modified: hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Apr 23 21:56:11 2009
@@ -113,6 +113,8 @@
 
   HashMap<Byte, Vector<ArrayList<Object>>> storage;
   int joinEmitInterval = -1;
+  int nextSz = 0;
+  transient Byte lastAlias = null;
   
   public void initialize(Configuration hconf, Reporter reporter) throws HiveException {
     super.initialize(hconf, reporter);
@@ -189,6 +191,14 @@
 
   InspectableObject tempAliasInspectableObject = new InspectableObject();
 
+  private int getNextSize(int sz) {
+    // A very simple counter to keep track of join entries for a key
+    if (sz >= 100000)
+      return sz + 100000;
+    
+    return 2 * sz;
+  }
+
   public void process(Object row, ObjectInspector rowInspector)
       throws HiveException {
     try {
@@ -196,6 +206,9 @@
       aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
       Byte alias = (Byte) (tempAliasInspectableObject.o);
 
+      if ((lastAlias == null) || (!lastAlias.equals(alias)))
+        nextSz = joinEmitInterval;
+
       // get the expressions for that alias
       JoinExprMap exmap = joinExprs.get(alias);
       ExprNodeEvaluator[] valueFields = exmap.getValueFields();
@@ -206,27 +219,29 @@
         vField.evaluate(row, rowInspector, tempAliasInspectableObject);
         nr.add(tempAliasInspectableObject.o);
       }
+      
+      // number of rows for the key in the given table
+      int sz = storage.get(alias).size();
 
       // Are we consuming too much memory
-      if (storage.get(alias).size() == joinEmitInterval) {
-        if (alias == numValues - 1) {
-          // The input is sorted by alias, so if we are already in the last join
-          // operand,
+      if (alias == numValues - 1) {
+        if (sz == joinEmitInterval) {
+          // The input is sorted by alias, so if we are already in the last join operand,
           // we can emit some results now.
-          // Note this has to be done before adding the current row to the
-          // storage,
+          // Note this has to be done before adding the current row to the storage,
           // to preserve the correctness for outer joins.
           checkAndGenObject();
           storage.get(alias).clear();
-        } else {
-          // Output a warning if we reached at least 1000 rows for a join
-          // operand
+        }
+      } else {
+        if (sz == nextSz) {
+          // Output a warning if we reached at least 1000 rows for a join operand
           // We won't output a warning for the last join operand since the size
           // will never goes to joinEmitInterval.
           InspectableObject io = new InspectableObject();
           keyField.evaluate(row, rowInspector, io);
-          LOG.warn("table " + alias
-              + " has more than joinEmitInterval rows for join key " + io.o);
+          LOG.warn("table " + alias + " has " + sz + " rows for join key " + io.o);
+          nextSz = getNextSize(nextSz);
         }
       }