You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/07/09 18:24:53 UTC
svn commit: r962594 - in /hadoop/hive/trunk: ./ data/files/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
Author: heyongqiang
Date: Fri Jul 9 16:24:52 2010
New Revision: 962594
URL: http://svn.apache.org/viewvc?rev=962594&view=rev
Log:
HIVE-1305. add progress in join and groupby. (Siying Dong via He Yongqiang)
Added:
hadoop/hive/trunk/data/files/kv6.txt
hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jul 9 16:24:52 2010
@@ -34,6 +34,9 @@ Trunk - Unreleased
HIVE-1454. insert overwrite and CTAS fail in hive local mode
(Joydeep Sen Sarma via He Yongqiang )
+ HIVE-1305. add progress in join and groupby
+ (Siying Dong via He Yongqiang)
+
Release 0.6.0 - Unreleased
INCOMPATIBLE CHANGES
Added: hadoop/hive/trunk/data/files/kv6.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/kv6.txt?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/data/files/kv6.txt (added)
+++ hadoop/hive/trunk/data/files/kv6.txt Fri Jul 9 16:24:52 2010
@@ -0,0 +1,100 @@
+00
+01
+02
+03
+04
+05
+06
+07
+08
+09
+010
+011
+012
+013
+014
+015
+016
+017
+018
+019
+020
+021
+022
+023
+024
+025
+026
+027
+028
+029
+030
+031
+032
+033
+034
+035
+036
+037
+038
+039
+040
+041
+042
+043
+044
+045
+046
+047
+048
+049
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Jul 9 16:24:52 2010
@@ -67,7 +67,6 @@ public abstract class AbstractMapJoinOpe
};
transient boolean firstRow;
- transient int heartbeatInterval;
public AbstractMapJoinOperator() {
}
@@ -82,8 +81,6 @@ public abstract class AbstractMapJoinOpe
numMapRowsRead = 0;
firstRow = true;
- heartbeatInterval = HiveConf.getIntVar(hconf,
- HiveConf.ConfVars.HIVESENDHEARTBEAT);
joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
@@ -129,14 +126,6 @@ public abstract class AbstractMapJoinOpe
+ FATAL_ERR_MSG[(int) counterCode]);
}
- protected void reportProgress() {
- // Send some status periodically
- numMapRowsRead++;
- if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
- reporter.progress();
- }
- }
-
@Override
public int getType() {
return OperatorType.MAPJOIN;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Jul 9 16:24:52 2010
@@ -24,8 +24,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,8 +44,8 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
@@ -140,6 +140,9 @@ public abstract class CommonJoinOperator
transient boolean handleSkewJoin = false;
+ protected transient int countAfterReport;
+ protected transient int heartbeatInterval;
+
public CommonJoinOperator() {
}
@@ -266,6 +269,11 @@ public abstract class CommonJoinOperator
protected void initializeOp(Configuration hconf) throws HiveException {
this.handleSkewJoin = conf.getHandleSkewJoin();
this.hconf = hconf;
+
+ heartbeatInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVESENDHEARTBEAT);
+ countAfterReport = 0;
+
totalSz = 0;
// Map that contains the rows for each alias
storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
@@ -480,7 +488,9 @@ public abstract class CommonJoinOperator
}
}
}
+
forward(forwardCache, outputObjInspector);
+ countAfterReport = 0;
}
private void copyOldArray(boolean[] src, boolean[] dest) {
@@ -816,6 +826,7 @@ public abstract class CommonJoinOperator
}
forward(forwardCache, outputObjInspector);
+ countAfterReport = 0;
return;
}
@@ -878,6 +889,17 @@ public abstract class CommonJoinOperator
}
}
+ protected void reportProgress() {
+ // Send some status periodically
+ countAfterReport++;
+
+ if ((countAfterReport % heartbeatInterval) == 0
+ && (reporter != null)) {
+ reporter.progress();
+ countAfterReport = 0;
+ }
+ }
+
/**
* All done.
*
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Jul 9 16:24:52 2010
@@ -151,6 +151,8 @@ public class GroupByOperator extends Ope
transient int totalVariableSize;
transient int numEntriesVarSize;
transient int numEntriesHashTable;
+ transient int countAfterReport;
+ transient int heartbeatInterval;
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -158,6 +160,10 @@ public class GroupByOperator extends Ope
numRowsInput = 0;
numRowsHashTbl = 0;
+ heartbeatInterval = HiveConf.getIntVar(hconf,
+ HiveConf.ConfVars.HIVESENDHEARTBEAT);
+ countAfterReport = 0;
+
assert (inputObjInspectors.length == 1);
ObjectInspector rowInspector = inputObjInspectors[0];
@@ -291,7 +297,7 @@ public class GroupByOperator extends Ope
* the total amount of memory to be used by the map-side hash. By default, all
* available memory is used. The size of each row is estimated, rather
* crudely, and the number of entries are figure out based on that.
- *
+ *
* @return number of entries that can fit in hash table - useful for map-side
* aggregation only
**/
@@ -311,7 +317,7 @@ public class GroupByOperator extends Ope
* datatype is of variable length, STRING, a list of such key positions is
* maintained, and the size for such positions is then actually calculated at
* runtime.
- *
+ *
* @param pos
* the position of the key
* @param c
@@ -342,7 +348,7 @@ public class GroupByOperator extends Ope
* field is of variable length, STRING, a list of such field names for the
* field position is maintained, and the size for such positions is then
* actually calculated at runtime.
- *
+ *
* @param pos
* the position of the key
* @param c
@@ -454,15 +460,15 @@ public class GroupByOperator extends Ope
* whether it has changed. As a cleanup, the lastInvoke logic can be pushed in
* the caller, and this function can be independent of that. The client should
* always notify whether it is a different row or not.
- *
+ *
* @param aggs the aggregations to be evaluated
- *
+ *
* @param row the row being processed
- *
+ *
* @param rowInspector the inspector for the row
- *
+ *
* @param hashAggr whether hash aggregation is being performed or not
- *
+ *
* @param newEntryForHashAggr only valid if it is a hash aggregation, whether
* it is a new entry or not
*/
@@ -545,6 +551,8 @@ public class GroupByOperator extends Ope
}
try {
+ countAfterReport++;
+
// Compute the keys
newKeys.clear();
for (int i = 0; i < keyFields.length; i++) {
@@ -562,6 +570,12 @@ public class GroupByOperator extends Ope
}
firstRowInGroup = false;
+
+ if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0
+ && (reporter != null)) {
+ reporter.progress();
+ countAfterReport = 0;
+ }
} catch (HiveException e) {
throw e;
} catch (Exception e) {
@@ -695,6 +709,7 @@ public class GroupByOperator extends Ope
// Forward the current keys if needed for sort-based aggregation
if (currentKeys != null && !keysAreEqual) {
forward(currentKeys, aggregations);
+ countAfterReport = 0;
}
// Need to update the keys?
@@ -724,7 +739,7 @@ public class GroupByOperator extends Ope
/**
* Based on user-parameters, should the hash table be flushed.
- *
+ *
* @param newKeys
* keys for the row under consideration
**/
@@ -783,6 +798,8 @@ public class GroupByOperator extends Ope
private void flush(boolean complete) throws HiveException {
+ countAfterReport = 0;
+
// Currently, the algorithm flushes 10% of the entries - this can be
// changed in the future
@@ -820,7 +837,7 @@ public class GroupByOperator extends Ope
/**
* Forward a record of keys and aggregation results.
- *
+ *
* @param keys
* The keys in the record
* @throws HiveException
@@ -843,7 +860,7 @@ public class GroupByOperator extends Ope
/**
* We need to forward all the aggregations to children.
- *
+ *
*/
@Override
public void closeOp(boolean abort) throws HiveException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Jul 9 16:24:52 2010
@@ -68,6 +68,7 @@ public class JoinOperator extends Common
@Override
public void processOp(Object row, int tag) throws HiveException {
try {
+ reportProgress();
// get alias
alias = (byte) tag;
@@ -116,7 +117,6 @@ public class JoinOperator extends Common
// Add the value to the vector
storage.get(alias).add(nr);
-
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Jul 9 16:24:52 2010
@@ -160,11 +160,11 @@ public class MapJoinOperator extends Abs
@Override
public void processOp(Object row, int tag) throws HiveException {
-
+
if (tag == posBigTable) {
this.getExecContext().processInputFileChangeForLocalWork();
}
-
+
try {
// get alias
alias = (byte) tag;
@@ -201,6 +201,7 @@ public class MapJoinOperator extends Abs
}
reportProgress();
+ numMapRowsRead++;
if ((numMapRowsRead > maxMapJoinSize) && (reporter != null)
&& (counterNameToEnum != null)) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Jul 9 16:24:52 2010
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
@@ -64,8 +64,8 @@ public class SMBMapJoinOperator extends
HashMap<Byte, RowContainer<ArrayList<Object>>> candidateStorage;
transient HashMap<Byte, String> tagToAlias;
- private transient HashMap<Byte, Boolean> fetchOpDone = new HashMap<Byte, Boolean>();
- private transient HashMap<Byte, Boolean> foundNextKeyGroup = new HashMap<Byte, Boolean>();
+ private transient final HashMap<Byte, Boolean> fetchOpDone = new HashMap<Byte, Boolean>();
+ private transient final HashMap<Byte, Boolean> foundNextKeyGroup = new HashMap<Byte, Boolean>();
transient boolean firstFetchHappened = false;
transient boolean localWorkInited = false;
@@ -125,8 +125,8 @@ public class SMBMapJoinOperator extends
localWorkInited = true;
this.localWork = localWork;
fetchOperators = new HashMap<String, FetchOperator>();
-
- Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
+
+ Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
// create map local operators
for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
.entrySet()) {
@@ -137,7 +137,7 @@ public class SMBMapJoinOperator extends
ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
if (list != null) {
ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
- }
+ }
} else {
ColumnProjectionUtils.setFullyReadColumns(jobClone);
}
@@ -213,6 +213,7 @@ public class SMBMapJoinOperator extends
}
reportProgress();
+ numMapRowsRead++;
// the big table has reached a new key group. try to let the small tables
// catch up with the big table.
@@ -256,6 +257,7 @@ public class SMBMapJoinOperator extends
break;
}
reportProgress();
+ numMapRowsRead++;
allFetchOpDone = allFetchOpDone();
}
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q Fri Jul 9 16:24:52 2010
@@ -0,0 +1,9 @@
+set hive.heartbeat.interval=5;
+
+DROP TABLE PROGRESS_1;
+CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE;
+LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1;
+
+select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key;
+
+DROP TABLE PROGRESS_1;
Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out Fri Jul 9 16:24:52 2010
@@ -0,0 +1,28 @@
+PREHOOK: query: DROP TABLE PROGRESS_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE PROGRESS_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@PROGRESS_1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1
+PREHOOK: type: LOAD
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@progress_1
+PREHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@progress_1
+PREHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000
+POSTHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@progress_1
+POSTHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000
+5000
+PREHOOK: query: DROP TABLE PROGRESS_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE PROGRESS_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@progress_1