You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/01/17 17:19:42 UTC

hive git commit: HIVE-20170: Improve JoinOperator "rows for join key" Logging (BELUGA BEHR via Peter Vary)

Repository: hive
Updated Branches:
  refs/heads/master 8e7c3b340 -> f09db52fd


HIVE-20170: Improve JoinOperator "rows for join key" Logging (BELUGA BEHR via Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f09db52f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f09db52f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f09db52f

Branch: refs/heads/master
Commit: f09db52fdd3e1fc4ebadc41c36b8347b9a5723a5
Parents: 8e7c3b3
Author: BELUGA BEHR <da...@gmail.com>
Authored: Thu Jan 17 18:19:04 2019 +0100
Committer: Peter Vary <pv...@cloudera.com>
Committed: Thu Jan 17 18:19:04 2019 +0100

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/CommonJoinOperator.java | 26 ++++++++++++++++----
 .../hadoop/hive/ql/exec/JoinOperator.java       | 24 +++++++++---------
 2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index 3762ee5..1c32588 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Join operator implementation.
  */
@@ -145,6 +147,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
   int joinCacheSize = 0;
   long nextSz = 0;
   transient Byte lastAlias = null;
+  private long logEveryNRows = 0L;
 
   transient boolean handleSkewJoin = false;
 
@@ -170,6 +173,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
     this.joinEmitInterval = clone.joinEmitInterval;
     this.joinCacheSize = clone.joinCacheSize;
     this.nextSz = clone.nextSz;
+    this.logEveryNRows = clone.logEveryNRows;
     this.childOperators = clone.childOperators;
     this.parentOperators = clone.parentOperators;
     this.done = clone.done;
@@ -294,6 +298,9 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
     joinCacheSize = HiveConf.getIntVar(hconf,
         HiveConf.ConfVars.HIVEJOINCACHESIZE);
 
+    logEveryNRows = HiveConf.getLongVar(hconf,
+        HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
+
     // construct dummy null row (indicating empty table) and
     // construct spill table serde which is used if input is too
     // large to fit into main memory.
@@ -394,13 +401,22 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
     super.startGroup();
   }
 
+  /**
+   * Determine the frequency with which to emit a log message instead of
+   * one for every for every event.
+   *
+   * @param sz The current number of events
+   * @return The next event count to emit a log message
+   */
   protected long getNextSize(long sz) {
-    // A very simple counter to keep track of join entries for a key
-    if (sz >= 100000) {
-      return sz + 100000;
+    Preconditions.checkArgument(sz >= 0L);
+    // If no logging is configured, log every 1, 10, 100, 1000, ..., 100000
+    if (this.logEveryNRows == 0L) {
+      final long next = (long) Math.pow(10.0, Math.ceil(Math.log10(sz + 1)));
+      return Math.min(100000L, next);
     }
-
-    return 2 * sz;
+    // Log every N rows
+    return ((sz / this.logEveryNRows) + 1L) * this.logEveryNRows;
   }
 
   protected transient Byte alias;

http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index e995ab7..451ba1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -82,10 +82,6 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
       lastAlias = alias;
       alias = (byte) tag;
 
-      if (!alias.equals(lastAlias)) {
-        nextSz = joinEmitInterval;
-      }
-
       List<Object> nr = getFilteredValue(alias, row);
 
       if (handleSkewJoin) {
@@ -93,7 +89,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
       }
 
       // number of rows for the key in the given table
-      long sz = storage[alias].rowCount();
+      final long sz = storage[alias].rowCount();
       StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
       StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
           .toString());
@@ -112,14 +108,16 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
           checkAndGenObject();
           storage[alias].clearRows();
         }
-      } else {
-        if (LOG.isInfoEnabled() && (sz == nextSz)) {
-          // Print a message if we reached at least 1000 rows for a join operand
-          // We won't print a message for the last join operand since the size
-          // will never goes to joinEmitInterval.
-          LOG.info("table " + alias + " has " + sz + " rows for join key " + keyObject);
-          nextSz = getNextSize(nextSz);
-        }
+      }
+
+      // The input is sorted by alias, so when an alias change is detected,
+      // reset the counter for the next join key in the stream
+      if (!alias.equals(lastAlias)) {
+        nextSz = getNextSize(0L);
+      }
+      if (sz == nextSz) {
+        LOG.info("Table {} has {} rows for join key {}", alias, sz, keyObject);
+        nextSz = getNextSize(nextSz);
       }
 
       // Add the value to the vector