You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/01/29 01:29:23 UTC

svn commit: r904321 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java

Author: namit
Date: Fri Jan 29 00:29:22 2010
New Revision: 904321

URL: http://svn.apache.org/viewvc?rev=904321&view=rev
Log:
HIVE-1110. Add a counter to denote that skewjoin got triggered
(He Yongqiang via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=904321&r1=904320&r2=904321&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jan 29 00:29:22 2010
@@ -95,6 +95,9 @@
     input size of skew join's following map join job
     (He Yongqiang via namit)
 
+    HIVE-1110. Add a counter to denote that skewjoin got triggered
+    (He Yongqiang via namit)
+
 Release 0.5.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=904321&r1=904320&r2=904321&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Jan 29 00:29:22 2010
@@ -32,6 +32,7 @@
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.LongWritable;
 
 /**
  * Join operator implementation.
@@ -41,7 +42,12 @@
   private static final long serialVersionUID = 1L;
 
   private transient SkewJoinHandler skewJoinKeyContext = null;
-
+  
+  public static enum SkewkeyTableCounter {
+    SKEWJOINFOLLOWUPJOBS
+  }
+  transient private final LongWritable skewjoin_followup_jobs = new LongWritable(0);
+  
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
@@ -49,7 +55,9 @@
     if (handleSkewJoin) {
       skewJoinKeyContext = new SkewJoinHandler(this);
       skewJoinKeyContext.initiliaze(hconf);
+      skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
     }
+    statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs);
   }
 
   @Override

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=904321&r1=904320&r2=904321&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Fri Jan 29 00:29:22 2010
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -85,6 +86,8 @@
   private Map<Byte, TableDesc> tblDesc = null;
 
   private Map<Byte, Boolean> bigKeysExistingMap = null;
+  
+  private LongWritable skewjoinFollowupJobs;
 
   Configuration hconf = null;
   List<Object> dummyKey = null;
@@ -226,7 +229,7 @@
       // table (the last table can always be streamed), we define that we get
       // a skew key now.
       currBigKeyTag = tag;
-
+      updateSkewJoinJobCounter(tag);
       // right now we assume that the group by is an ArrayList object. It may
       // change in future.
       if (!(dummyKey instanceof List)) {
@@ -342,4 +345,12 @@
     return new Path(tmpPath, taskId);
   }
 
+  public void setSkewJoinJobCounter(LongWritable skewjoinFollowupJobs) {
+    this.skewjoinFollowupJobs = skewjoinFollowupJobs;
+  }
+  
+  public void updateSkewJoinJobCounter(int tag) {
+    this.skewjoinFollowupJobs.set(this.skewjoinFollowupJobs.get()+1);
+  }
+
 }