You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/03/25 00:24:27 UTC

svn commit: r1581111 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/optimizer/physical/ java/org/apache/hadoop/hive/ql/plan/ test/queries/clientpositive/ test/results/clientpositive/

Author: sershe
Date: Mon Mar 24 23:24:18 2014
New Revision: 1581111

URL: http://svn.apache.org/r1581111
Log:
HIVE-6682 : nonstaged mapjoin table memory check may be broken (Sergey Shelukhin, reviewed by Navis)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/mapjoin_memcheck.q
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_memcheck.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Mon Mar 24 23:24:18 2014
@@ -56,7 +56,7 @@ import org.apache.hadoop.util.Reflection
 public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> implements
     Serializable {
   private static final long serialVersionUID = 1L;
-  private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
+  protected static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
   /**
    * The expressions for join inputs's join keys.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TemporaryHashSinkOperator.java Mon Mar 24 23:24:18 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -27,6 +29,12 @@ import java.io.IOException;
 public class TemporaryHashSinkOperator extends HashTableSinkOperator {
   public TemporaryHashSinkOperator(MapJoinDesc desc) {
     conf = new HashTableSinkDesc(desc);
+
+    // Sanity check the config.
+    assert conf.getHashtableMemoryUsage() != 0;
+    if (conf.getHashtableMemoryUsage() == 0) {
+      LOG.error("Hash table memory usage not set in map join operator; non-staged load may fail");
+    }
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java Mon Mar 24 23:24:18 2014
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
  * OOM in group by operator.
  */
 public final class LocalMapJoinProcFactory {
+  private static final Log LOG = LogFactory.getLog(LocalMapJoinProcFactory.class);
 
   public static NodeProcessor getJoinProc() {
     return new LocalMapJoinProcessor();
@@ -133,6 +136,9 @@ public final class LocalMapJoinProcFacto
         hashtableMemoryUsage = conf.getFloatVar(
             HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE);
       }
+      mapJoinOp.getConf().setHashTableMemoryUsage(hashtableMemoryUsage);
+      LOG.info("Setting max memory usage to " + hashtableMemoryUsage + " for table sink "
+          + (context.isFollowedByGroupBy() ? "" : "not") + " followed by group by");
       hashTableSinkOp.getConf().setHashtableMemoryUsage(hashtableMemoryUsage);
 
       // get the last operator for processing big tables

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java Mon Mar 24 23:24:18 2014
@@ -114,6 +114,7 @@ public class HashTableSinkDesc extends J
     this.retainList = clone.getRetainList();
     this.dumpFilePrefix = clone.getDumpFilePrefix();
     this.bucketMapjoinContext = new BucketMapJoinContext(clone);
+    this.hashtableMemoryUsage = clone.getHashTableMemoryUsage();
   }
 
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Mon Mar 24 23:24:18 2014
@@ -61,6 +61,9 @@ public class MapJoinDesc extends JoinDes
   // flag for bucket map join. One usage is to set BucketizedHiveInputFormat
   private boolean isBucketMapJoin;
 
+  // Hash table memory usage allowed; used in case of non-staged mapjoin.
+  private float hashtableMemoryUsage;
+
   public MapJoinDesc() {
     bigTableBucketNumMapping = new LinkedHashMap<String, Integer>();
   }
@@ -269,4 +272,12 @@ public class MapJoinDesc extends JoinDes
   public void setBucketMapJoin(boolean isBucketMapJoin) {
     this.isBucketMapJoin = isBucketMapJoin;
   }
+
+  public void setHashTableMemoryUsage(float hashtableMemoryUsage) {
+    this.hashtableMemoryUsage = hashtableMemoryUsage;
+  }
+
+  public float getHashTableMemoryUsage() {
+    return hashtableMemoryUsage;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/mapjoin_memcheck.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/mapjoin_memcheck.q?rev=1581111&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/mapjoin_memcheck.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/mapjoin_memcheck.q Mon Mar 24 23:24:18 2014
@@ -0,0 +1,16 @@
+
+set hive.auto.convert.join = true;
+
+create table src0 like src;
+insert into table src0 select * from src where src.key < 10;
+
+set hive.mapjoin.check.memory.rows=1;
+
+explain 
+select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1;
+
+select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1;
+
+drop table src0;
\ No newline at end of file

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out?rev=1581111&r1=1581110&r2=1581111&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out Mon Mar 24 23:24:18 2014 differ

Added: hive/trunk/ql/src/test/results/clientpositive/mapjoin_memcheck.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/mapjoin_memcheck.q.out?rev=1581111&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/mapjoin_memcheck.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/mapjoin_memcheck.q.out Mon Mar 24 23:24:18 2014
@@ -0,0 +1,128 @@
+PREHOOK: query: create table src0 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: create table src0 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src0
+PREHOOK: query: insert into table src0 select * from src where src.key < 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src0
+POSTHOOK: query: insert into table src0 select * from src where src.key < 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@src0
+POSTHOOK: Lineage: src0.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src0.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: explain 
+select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1
+PREHOOK: type: QUERY
+POSTHOOK: query: explain 
+select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: src0.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src0.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-2
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: src2
+            Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+            Map Join Operator
+              condition map:
+                   Inner Join 0 to 1
+              condition expressions:
+                0 {key} {value}
+                1 {key} {value}
+              keys:
+                0 key (type: string)
+                1 key (type: string)
+              outputColumnNames: _col0, _col1, _col4, _col5
+              Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: _col0 (type: string), _col1 (type: string), _col4 (type: string), _col5 (type: string)
+                outputColumnNames: _col0, _col1, _col2, _col3
+                Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string), _col1 (type: string)
+                  sort order: ++
+                  Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+      Local Work:
+        Map Reduce Local Work
+          Alias -> Map Local Tables:
+            src1 
+              Fetch Operator
+                limit: -1
+          Alias -> Map Local Operator Tree:
+            src1 
+              TableScan
+                alias: src1
+                Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+      Reduce Operator Tree:
+        Extract
+          Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.TextInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+PREHOOK: query: select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src0
+#### A masked pattern was here ####
+POSTHOOK: query: select src1.key as k1, src1.value as v1, src2.key, src2.value
+from src0 src1 inner join src0 src2 on src1.key = src2.key order by k1, v1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src0
+#### A masked pattern was here ####
+POSTHOOK: Lineage: src0.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src0.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+0	val_0	0	val_0
+2	val_2	2	val_2
+4	val_4	4	val_4
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+5	val_5	5	val_5
+8	val_8	8	val_8
+9	val_9	9	val_9
+PREHOOK: query: drop table src0
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@src0
+PREHOOK: Output: default@src0
+POSTHOOK: query: drop table src0
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@src0
+POSTHOOK: Output: default@src0
+POSTHOOK: Lineage: src0.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src0.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]