You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/01/17 17:19:42 UTC
hive git commit: HIVE-20170: Improve JoinOperator "rows for join key"
Logging (BELUGA BEHR via Peter Vary)
Repository: hive
Updated Branches:
refs/heads/master 8e7c3b340 -> f09db52fd
HIVE-20170: Improve JoinOperator "rows for join key" Logging (BELUGA BEHR via Peter Vary)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f09db52f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f09db52f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f09db52f
Branch: refs/heads/master
Commit: f09db52fdd3e1fc4ebadc41c36b8347b9a5723a5
Parents: 8e7c3b3
Author: BELUGA BEHR <da...@gmail.com>
Authored: Thu Jan 17 18:19:04 2019 +0100
Committer: Peter Vary <pv...@cloudera.com>
Committed: Thu Jan 17 18:19:04 2019 +0100
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/CommonJoinOperator.java | 26 ++++++++++++++++----
.../hadoop/hive/ql/exec/JoinOperator.java | 24 +++++++++---------
2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
index 3762ee5..1c32588 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Join operator implementation.
*/
@@ -145,6 +147,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
int joinCacheSize = 0;
long nextSz = 0;
transient Byte lastAlias = null;
+ private long logEveryNRows = 0L;
transient boolean handleSkewJoin = false;
@@ -170,6 +173,7 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
this.joinEmitInterval = clone.joinEmitInterval;
this.joinCacheSize = clone.joinCacheSize;
this.nextSz = clone.nextSz;
+ this.logEveryNRows = clone.logEveryNRows;
this.childOperators = clone.childOperators;
this.parentOperators = clone.parentOperators;
this.done = clone.done;
@@ -294,6 +298,9 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
joinCacheSize = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVEJOINCACHESIZE);
+ logEveryNRows = HiveConf.getLongVar(hconf,
+ HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
+
// construct dummy null row (indicating empty table) and
// construct spill table serde which is used if input is too
// large to fit into main memory.
@@ -394,13 +401,22 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends
super.startGroup();
}
+ /**
+ * Determine the frequency with which to emit a log message instead of
+ * one for every for every event.
+ *
+ * @param sz The current number of events
+ * @return The next event count to emit a log message
+ */
protected long getNextSize(long sz) {
- // A very simple counter to keep track of join entries for a key
- if (sz >= 100000) {
- return sz + 100000;
+ Preconditions.checkArgument(sz >= 0L);
+ // If no logging is configured, log every 1, 10, 100, 1000, ..., 100000
+ if (this.logEveryNRows == 0L) {
+ final long next = (long) Math.pow(10.0, Math.ceil(Math.log10(sz + 1)));
+ return Math.min(100000L, next);
}
-
- return 2 * sz;
+ // Log every N rows
+ return ((sz / this.logEveryNRows) + 1L) * this.logEveryNRows;
}
protected transient Byte alias;
http://git-wip-us.apache.org/repos/asf/hive/blob/f09db52f/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index e995ab7..451ba1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -82,10 +82,6 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
lastAlias = alias;
alias = (byte) tag;
- if (!alias.equals(lastAlias)) {
- nextSz = joinEmitInterval;
- }
-
List<Object> nr = getFilteredValue(alias, row);
if (handleSkewJoin) {
@@ -93,7 +89,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
}
// number of rows for the key in the given table
- long sz = storage[alias].rowCount();
+ final long sz = storage[alias].rowCount();
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
.toString());
@@ -112,14 +108,16 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
checkAndGenObject();
storage[alias].clearRows();
}
- } else {
- if (LOG.isInfoEnabled() && (sz == nextSz)) {
- // Print a message if we reached at least 1000 rows for a join operand
- // We won't print a message for the last join operand since the size
- // will never goes to joinEmitInterval.
- LOG.info("table " + alias + " has " + sz + " rows for join key " + keyObject);
- nextSz = getNextSize(nextSz);
- }
+ }
+
+ // The input is sorted by alias, so when an alias change is detected,
+ // reset the counter for the next join key in the stream
+ if (!alias.equals(lastAlias)) {
+ nextSz = getNextSize(0L);
+ }
+ if (sz == nextSz) {
+ LOG.info("Table {} has {} rows for join key {}", alias, sz, keyObject);
+ nextSz = getNextSize(nextSz);
}
// Add the value to the vector