You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/04/23 23:56:11 UTC
svn commit: r768070 - in /hadoop/hive/branches/branch-0.3: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
Author: zshao
Date: Thu Apr 23 21:56:11 2009
New Revision: 768070
URL: http://svn.apache.org/viewvc?rev=768070&view=rev
Log:
HIVE-440. Reducer and Join to print out number of rows processed. (Namit Jain via zshao)
Modified:
hadoop/hive/branches/branch-0.3/CHANGES.txt
hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
Modified: hadoop/hive/branches/branch-0.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/CHANGES.txt?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.3/CHANGES.txt Thu Apr 23 21:56:11 2009
@@ -41,6 +41,9 @@
HIVE-427. Add missing config parameters in hive-default.xml.
(Namit Jain via zshao)
+ HIVE-440. Reducer and Join to print out number of rows processed.
+ (Namit Jain via zshao)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Thu Apr 23 21:56:11 2009
@@ -47,6 +47,8 @@
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
+ private long cntr = 0;
+ private long nextCntr = 1;
private static String [] fieldNames;
public static final Log l4j = LogFactory.getLog("ExecReducer");
@@ -161,6 +163,10 @@
row.add(keyObject);
row.add(valueObject[tag]);
row.add(tag);
+ if (cntr == nextCntr) {
+ l4j.info("ExecReducer: processing " + cntr + " rows");
+ nextCntr = getNextCntr(cntr);
+ }
reducer.process(row, rowObjectInspector[tag]);
}
@@ -170,6 +176,15 @@
}
}
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by the reducer. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000)
+ return cntr + 1000000;
+
+ return 10 * cntr;
+ }
+
public void close() {
// No row was processed
@@ -191,6 +206,7 @@
l4j.trace("End Group");
reducer.endGroup();
}
+ l4j.info("ExecReducer: processed " + cntr + " rows");
reducer.close(abort);
reportStats rps = new reportStats (rp);
reducer.preorderMap(rps);
Modified: hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=768070&r1=768069&r2=768070&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/branches/branch-0.3/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Apr 23 21:56:11 2009
@@ -113,6 +113,8 @@
HashMap<Byte, Vector<ArrayList<Object>>> storage;
int joinEmitInterval = -1;
+ int nextSz = 0;
+ transient Byte lastAlias = null;
public void initialize(Configuration hconf, Reporter reporter) throws HiveException {
super.initialize(hconf, reporter);
@@ -189,6 +191,14 @@
InspectableObject tempAliasInspectableObject = new InspectableObject();
+ private int getNextSize(int sz) {
+ // A very simple counter to keep track of join entries for a key
+ if (sz >= 100000)
+ return sz + 100000;
+
+ return 2 * sz;
+ }
+
public void process(Object row, ObjectInspector rowInspector)
throws HiveException {
try {
@@ -196,6 +206,9 @@
aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
Byte alias = (Byte) (tempAliasInspectableObject.o);
+ if ((lastAlias == null) || (!lastAlias.equals(alias)))
+ nextSz = joinEmitInterval;
+
// get the expressions for that alias
JoinExprMap exmap = joinExprs.get(alias);
ExprNodeEvaluator[] valueFields = exmap.getValueFields();
@@ -206,27 +219,29 @@
vField.evaluate(row, rowInspector, tempAliasInspectableObject);
nr.add(tempAliasInspectableObject.o);
}
+
+ // number of rows for the key in the given table
+ int sz = storage.get(alias).size();
// Are we consuming too much memory
- if (storage.get(alias).size() == joinEmitInterval) {
- if (alias == numValues - 1) {
- // The input is sorted by alias, so if we are already in the last join
- // operand,
+ if (alias == numValues - 1) {
+ if (sz == joinEmitInterval) {
+ // The input is sorted by alias, so if we are already in the last join operand,
// we can emit some results now.
- // Note this has to be done before adding the current row to the
- // storage,
+ // Note this has to be done before adding the current row to the storage,
// to preserve the correctness for outer joins.
checkAndGenObject();
storage.get(alias).clear();
- } else {
- // Output a warning if we reached at least 1000 rows for a join
- // operand
+ }
+ } else {
+ if (sz == nextSz) {
+ // Output a warning if we reached at least 1000 rows for a join operand
// We won't output a warning for the last join operand since the size
// will never goes to joinEmitInterval.
InspectableObject io = new InspectableObject();
keyField.evaluate(row, rowInspector, io);
- LOG.warn("table " + alias
- + " has more than joinEmitInterval rows for join key " + io.o);
+ LOG.warn("table " + alias + " has " + sz + " rows for join key " + io.o);
+ nextSz = getNextSize(nextSz);
}
}