You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/07/09 18:24:53 UTC

svn commit: r962594 - in /hadoop/hive/trunk: ./ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/

Author: heyongqiang
Date: Fri Jul  9 16:24:52 2010
New Revision: 962594

URL: http://svn.apache.org/viewvc?rev=962594&view=rev
Log:
HIVE-1305. add progress in join and groupby. (Siying Dong via He Yongqiang)

Added:
    hadoop/hive/trunk/data/files/kv6.txt
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jul  9 16:24:52 2010
@@ -34,6 +34,9 @@ Trunk -  Unreleased
     HIVE-1454. insert overwrite and CTAS fail in hive local mode
     (Joydeep Sen Sarma via He Yongqiang )
 
+    HIVE-1305. add progress in join and groupby
+    (Siying Dong via He Yongqiang)
+
 Release 0.6.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Added: hadoop/hive/trunk/data/files/kv6.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/kv6.txt?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/data/files/kv6.txt (added)
+++ hadoop/hive/trunk/data/files/kv6.txt Fri Jul  9 16:24:52 2010
@@ -0,0 +1,100 @@
+00
+01
+02
+03
+04
+05
+06
+07
+08
+09
+010
+011
+012
+013
+014
+015
+016
+017
+018
+019
+020
+021
+022
+023
+024
+025
+026
+027
+028
+029
+030
+031
+032
+033
+034
+035
+036
+037
+038
+039
+040
+041
+042
+043
+044
+045
+046
+047
+048
+049
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Jul  9 16:24:52 2010
@@ -67,7 +67,6 @@ public abstract class AbstractMapJoinOpe
       };
 
   transient boolean firstRow;
-  transient int heartbeatInterval;
 
   public AbstractMapJoinOperator() {
   }
@@ -82,8 +81,6 @@ public abstract class AbstractMapJoinOpe
 
     numMapRowsRead = 0;
     firstRow = true;
-    heartbeatInterval = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVESENDHEARTBEAT);
 
     joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
 
@@ -129,14 +126,6 @@ public abstract class AbstractMapJoinOpe
         + FATAL_ERR_MSG[(int) counterCode]);
   }
 
-  protected void reportProgress() {
-    // Send some status periodically
-    numMapRowsRead++;
-    if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
-      reporter.progress();
-    }
-  }
-
   @Override
   public int getType() {
     return OperatorType.MAPJOIN;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Jul  9 16:24:52 2010
@@ -24,8 +24,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,8 +44,8 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -140,6 +140,9 @@ public abstract class CommonJoinOperator
 
   transient boolean handleSkewJoin = false;
 
+  protected transient int countAfterReport;
+  protected transient int heartbeatInterval;
+
   public CommonJoinOperator() {
   }
 
@@ -266,6 +269,11 @@ public abstract class CommonJoinOperator
   protected void initializeOp(Configuration hconf) throws HiveException {
     this.handleSkewJoin = conf.getHandleSkewJoin();
     this.hconf = hconf;
+
+    heartbeatInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVESENDHEARTBEAT);
+    countAfterReport = 0;
+
     totalSz = 0;
     // Map that contains the rows for each alias
     storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
@@ -480,7 +488,9 @@ public abstract class CommonJoinOperator
         }
       }
     }
+
     forward(forwardCache, outputObjInspector);
+    countAfterReport = 0;
   }
 
   private void copyOldArray(boolean[] src, boolean[] dest) {
@@ -816,6 +826,7 @@ public abstract class CommonJoinOperator
       }
 
       forward(forwardCache, outputObjInspector);
+      countAfterReport = 0;
       return;
     }
 
@@ -878,6 +889,17 @@ public abstract class CommonJoinOperator
     }
   }
 
+  protected void reportProgress() {
+    // Send some status periodically
+    countAfterReport++;
+
+    if ((countAfterReport % heartbeatInterval) == 0
+        && (reporter != null)) {
+      reporter.progress();
+      countAfterReport = 0;
+    }
+  }
+
   /**
    * All done.
    *

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Jul  9 16:24:52 2010
@@ -151,6 +151,8 @@ public class GroupByOperator extends Ope
   transient int totalVariableSize;
   transient int numEntriesVarSize;
   transient int numEntriesHashTable;
+  transient int countAfterReport;
+  transient int heartbeatInterval;
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
@@ -158,6 +160,10 @@ public class GroupByOperator extends Ope
     numRowsInput = 0;
     numRowsHashTbl = 0;
 
+    heartbeatInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVESENDHEARTBEAT);
+    countAfterReport = 0;
+
     assert (inputObjInspectors.length == 1);
     ObjectInspector rowInspector = inputObjInspectors[0];
 
@@ -291,7 +297,7 @@ public class GroupByOperator extends Ope
    * the total amount of memory to be used by the map-side hash. By default, all
    * available memory is used. The size of each row is estimated, rather
    * crudely, and the number of entries are figure out based on that.
-   * 
+   *
    * @return number of entries that can fit in hash table - useful for map-side
    *         aggregation only
    **/
@@ -311,7 +317,7 @@ public class GroupByOperator extends Ope
    * datatype is of variable length, STRING, a list of such key positions is
    * maintained, and the size for such positions is then actually calculated at
    * runtime.
-   * 
+   *
    * @param pos
    *          the position of the key
    * @param c
@@ -342,7 +348,7 @@ public class GroupByOperator extends Ope
    * field is of variable length, STRING, a list of such field names for the
    * field position is maintained, and the size for such positions is then
    * actually calculated at runtime.
-   * 
+   *
    * @param pos
    *          the position of the key
    * @param c
@@ -454,15 +460,15 @@ public class GroupByOperator extends Ope
    * whether it has changed. As a cleanup, the lastInvoke logic can be pushed in
    * the caller, and this function can be independent of that. The client should
    * always notify whether it is a different row or not.
-   * 
+   *
    * @param aggs the aggregations to be evaluated
-   * 
+   *
    * @param row the row being processed
-   * 
+   *
    * @param rowInspector the inspector for the row
-   * 
+   *
    * @param hashAggr whether hash aggregation is being performed or not
-   * 
+   *
    * @param newEntryForHashAggr only valid if it is a hash aggregation, whether
    * it is a new entry or not
    */
@@ -545,6 +551,8 @@ public class GroupByOperator extends Ope
     }
 
     try {
+      countAfterReport++;
+
       // Compute the keys
       newKeys.clear();
       for (int i = 0; i < keyFields.length; i++) {
@@ -562,6 +570,12 @@ public class GroupByOperator extends Ope
       }
 
       firstRowInGroup = false;
+
+      if (countAfterReport != 0 && (countAfterReport % heartbeatInterval) == 0
+          && (reporter != null)) {
+        reporter.progress();
+        countAfterReport = 0;
+      }
     } catch (HiveException e) {
       throw e;
     } catch (Exception e) {
@@ -695,6 +709,7 @@ public class GroupByOperator extends Ope
     // Forward the current keys if needed for sort-based aggregation
     if (currentKeys != null && !keysAreEqual) {
       forward(currentKeys, aggregations);
+      countAfterReport = 0;
     }
 
     // Need to update the keys?
@@ -724,7 +739,7 @@ public class GroupByOperator extends Ope
 
   /**
    * Based on user-parameters, should the hash table be flushed.
-   * 
+   *
    * @param newKeys
    *          keys for the row under consideration
    **/
@@ -783,6 +798,8 @@ public class GroupByOperator extends Ope
 
   private void flush(boolean complete) throws HiveException {
 
+    countAfterReport = 0;
+
     // Currently, the algorithm flushes 10% of the entries - this can be
     // changed in the future
 
@@ -820,7 +837,7 @@ public class GroupByOperator extends Ope
 
   /**
    * Forward a record of keys and aggregation results.
-   * 
+   *
    * @param keys
    *          The keys in the record
    * @throws HiveException
@@ -843,7 +860,7 @@ public class GroupByOperator extends Ope
 
   /**
    * We need to forward all the aggregations to children.
-   * 
+   *
    */
   @Override
   public void closeOp(boolean abort) throws HiveException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Jul  9 16:24:52 2010
@@ -68,6 +68,7 @@ public class JoinOperator extends Common
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
+      reportProgress();
 
       // get alias
       alias = (byte) tag;
@@ -116,7 +117,6 @@ public class JoinOperator extends Common
 
       // Add the value to the vector
       storage.get(alias).add(nr);
-
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Jul  9 16:24:52 2010
@@ -160,11 +160,11 @@ public class MapJoinOperator extends Abs
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    
+
     if (tag == posBigTable) {
       this.getExecContext().processInputFileChangeForLocalWork();
     }
-    
+
     try {
       // get alias
       alias = (byte) tag;
@@ -201,6 +201,7 @@ public class MapJoinOperator extends Abs
         }
 
         reportProgress();
+        numMapRowsRead++;
 
         if ((numMapRowsRead > maxMapJoinSize) && (reporter != null)
             && (counterNameToEnum != null)) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=962594&r1=962593&r2=962594&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Jul  9 16:24:52 2010
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
@@ -64,8 +64,8 @@ public class SMBMapJoinOperator extends 
   HashMap<Byte, RowContainer<ArrayList<Object>>> candidateStorage;
 
   transient HashMap<Byte, String> tagToAlias;
-  private transient HashMap<Byte, Boolean> fetchOpDone = new HashMap<Byte, Boolean>();
-  private transient HashMap<Byte, Boolean> foundNextKeyGroup = new HashMap<Byte, Boolean>();
+  private transient final HashMap<Byte, Boolean> fetchOpDone = new HashMap<Byte, Boolean>();
+  private transient final HashMap<Byte, Boolean> foundNextKeyGroup = new HashMap<Byte, Boolean>();
   transient boolean firstFetchHappened = false;
   transient boolean localWorkInited = false;
 
@@ -125,8 +125,8 @@ public class SMBMapJoinOperator extends 
     localWorkInited = true;
     this.localWork = localWork;
     fetchOperators = new HashMap<String, FetchOperator>();
-    
-    Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>(); 
+
+    Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
     // create map local operators
     for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
         .entrySet()) {
@@ -137,7 +137,7 @@ public class SMBMapJoinOperator extends 
         ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
         if (list != null) {
           ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
-        }  
+        }
       } else {
         ColumnProjectionUtils.setFullyReadColumns(jobClone);
       }
@@ -213,6 +213,7 @@ public class SMBMapJoinOperator extends 
     }
 
     reportProgress();
+    numMapRowsRead++;
 
     // the big table has reached a new key group. try to let the small tables
     // catch up with the big table.
@@ -256,6 +257,7 @@ public class SMBMapJoinOperator extends 
         break;
       }
       reportProgress();
+      numMapRowsRead++;
       allFetchOpDone = allFetchOpDone();
     }
 

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/progress_1.q Fri Jul  9 16:24:52 2010
@@ -0,0 +1,9 @@
+set hive.heartbeat.interval=5; 
+
+DROP TABLE PROGRESS_1;
+CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE;
+LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1;
+
+select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key;
+
+DROP TABLE PROGRESS_1;

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out?rev=962594&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/progress_1.q.out Fri Jul  9 16:24:52 2010
@@ -0,0 +1,28 @@
+PREHOOK: query: DROP TABLE PROGRESS_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE PROGRESS_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE PROGRESS_1(key int, value string) STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@PROGRESS_1
+PREHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1
+PREHOOK: type: LOAD
+POSTHOOK: query: LOAD DATA LOCAL INPATH '../data/files/kv6.txt' INTO TABLE PROGRESS_1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@progress_1
+PREHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@progress_1
+PREHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000
+POSTHOOK: query: select count(1) from PROGRESS_1 t1 join PROGRESS_1 t2 on t1.key=t2.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@progress_1
+POSTHOOK: Output: file:/tmp/sdong/hive_2010-07-08_16-37-28_683_2518541288555731352/10000
+5000
+PREHOOK: query: DROP TABLE PROGRESS_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE PROGRESS_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@progress_1