You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sd...@apache.org on 2011/07/26 19:13:30 UTC

svn commit: r1151175 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/parse/ test/org/apache/hadoop/hive/ql/hooks/ test/queries/clientpositive/ test/results/clientpositive/

Author: sdong
Date: Tue Jul 26 17:13:29 2011
New Revision: 1151175

URL: http://svn.apache.org/viewvc?rev=1151175&view=rev
Log:
HIVE 2282. Local mode needs to work well with block sampling (Kevin Wilfong via Siying Dong)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java
    hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q
    hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=1151175&r1=1151174&r2=1151175&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Tue Jul 26 17:13:29 2011
@@ -60,6 +60,10 @@ public class MapRedTask extends ExecDriv
   private transient ContentSummary inputSummary = null;
   private transient boolean runningViaChild = false;
 
+  private transient boolean inputSizeEstimated = false;
+  private transient long totalInputFileSize;
+  private transient long totalInputNumFiles;
+
   public MapRedTask() {
     super();
   }
@@ -91,16 +95,21 @@ public class MapRedTask extends ExecDriv
           inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
         }
 
+        // set the values of totalInputFileSize and totalInputNumFiles, estimating them
+        // if percentage block sampling is being used
+        estimateInputSize();
+
         // at this point the number of reducers is precisely defined in the plan
         int numReducers = work.getNumReduceTasks();
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Task: " + getId() + ", Summary: " +
-                    inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+                    totalInputFileSize + "," + totalInputNumFiles + ","
                     + numReducers);
         }
 
-        String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers);
+        String reason = MapRedTask.isEligibleForLocalMode(conf, numReducers,
+            totalInputFileSize, totalInputNumFiles);
         if (reason == null) {
           // clone configuration before modifying it on per-task basis
           cloneConf();
@@ -366,9 +375,50 @@ public class MapRedTask extends ExecDriv
       inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work, null);
     }
 
-    long totalInputFileSize = inputSummary.getLength();
-
     // if all inputs are sampled, we should shrink the size of reducers accordingly.
+    estimateInputSize();
+
+    if (totalInputFileSize != inputSummary.getLength()) {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+    } else {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+    }
+
+    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+    reducers = Math.max(1, reducers);
+    reducers = Math.min(maxReducers, reducers);
+    return reducers;
+  }
+
+  /**
+   * Sets the values of totalInputFileSize and totalInputNumFiles.  If percentage
+   * block sampling is used, these values are estimates based on the highest
+   * percentage being used for sampling multiplied by the value obtained from the
+   * input summary.  Otherwise, these values are set to the exact value obtained
+   * from the input summary.
+   *
+   * Once the function completes, inputSizeEstimated is set so that the logic is
+   * never run more than once.
+   */
+  private void estimateInputSize() {
+    if (inputSizeEstimated) {
+      // If we've already run this function, return
+      return;
+    }
+
+    // Initialize the values to be those taken from the input summary
+    totalInputFileSize = inputSummary.getLength();
+    totalInputNumFiles = inputSummary.getFileCount();
+
+    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+      // If percentage block sampling wasn't used, we don't need to do any estimation
+      inputSizeEstimated = true;
+      return;
+    }
+
+    // if all inputs are sampled, we should shrink the size of the input accordingly
     double highestSamplePercentage = 0;
     boolean allSample = false;
     for (String alias : work.getAliasToWork().keySet()) {
@@ -385,42 +435,38 @@ public class MapRedTask extends ExecDriv
     }
     if (allSample) {
       // This is a little bit dangerous if inputs turns out not to be able to be sampled.
-      // In that case, we significantly underestimate number of reducers.
-      // It's the same as other cases of estimateNumberOfReducers(). It's just our best
+      // In that case, we significantly underestimate the input.
+      // It's the same as estimateNumberOfReducers(). It's just our best
       // guess and there is no guarantee.
       totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
           , totalInputFileSize);
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
-    } else {
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+      totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+          , totalInputNumFiles);
     }
 
-    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
-    reducers = Math.max(1, reducers);
-    reducers = Math.min(maxReducers, reducers);
-    return reducers;
+    inputSizeEstimated = true;
   }
 
   /**
    * Find out if a job can be run in local mode based on it's characteristics
    *
    * @param conf Hive Configuration
-   * @param inputSummary summary about the input files for this job
    * @param numReducers total number of reducers for this job
+   * @param inputLength the size of the input
+   * @param inputFileCount the number of files of input
    * @return String null if job is eligible for local mode, reason otherwise
    */
   public static String isEligibleForLocalMode(HiveConf conf,
-                                              ContentSummary inputSummary,
-                                              int numReducers) {
+                                              int numReducers,
+                                              long inputLength,
+                                              long inputFileCount) {
 
     long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
     long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);
 
     // check for max input size
-    if (inputSummary.getLength() > maxBytes) {
-      return "Input Size (= " + inputSummary.getLength() + ") is larger than " +
+    if (inputLength > maxBytes) {
+      return "Input Size (= " + inputLength + ") is larger than " +
         HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
     }
 
@@ -428,8 +474,8 @@ public class MapRedTask extends ExecDriv
     // in the absence of an easy way to get the number of splits - do this
     // based on the total number of files (pessimistically assumming that
     // splits are equal to number of files in worst case)
-    if (inputSummary.getFileCount() > maxTasks) {
-      return "Number of Input Files (= " + inputSummary.getFileCount() +
+    if (inputFileCount > maxTasks) {
+      return "Number of Input Files (= " + inputFileCount +
         ") is larger than " +
         HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1151175&r1=1151174&r2=1151175&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Jul 26 17:13:29 2011
@@ -7997,7 +7997,8 @@ public class SemanticAnalyzer extends Ba
                    + numReducers);
         }
 
-        if(MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers) != null) {
+        if(MapRedTask.isEligibleForLocalMode(conf, numReducers,
+            inputSummary.getLength(), inputSummary.getFileCount()) != null) {
           hasNonLocalJob = true;
           break;
         }else{

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/hooks/VerifyIsLocalModeHook.java Tue Jul 26 17:13:29 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+
+public class VerifyIsLocalModeHook implements ExecuteWithHookContext {
+
+  public void run(HookContext hookContext) {
+    if (hookContext.getHookType().equals(HookType.POST_EXEC_HOOK)) {
+      List<TaskRunner> taskRunners = hookContext.getCompleteTaskList();
+      for (TaskRunner taskRunner : taskRunners) {
+        Task task = taskRunner.getTask();
+        if (task.isMapRedTask()) {
+          Assert.assertTrue("VerifyIsLocalModeHook fails because a isLocalMode was not set for a task.",
+              task.isLocalMode());
+        }
+      }
+    }
+  }
+}

Added: hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/sample_islocalmode_hook.q Tue Jul 26 17:13:29 2011
@@ -0,0 +1,42 @@
+drop table if exists sih_i_part;
+drop table if exists sih_src;
+drop table if exists sih_src2;
+
+set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+set mapred.max.split.size=300;
+set mapred.min.split.size=300;
+set mapred.min.split.size.per.node=300;
+set mapred.min.split.size.per.rack=300;
+set hive.exec.mode.local.auto=true;
+set hive.merge.smallfiles.avgsize=1;
+
+-- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string);
+insert overwrite table sih_i_part partition (p='1') select key, value from src;
+insert overwrite table sih_i_part partition (p='2') select key+10000, value from src;
+insert overwrite table sih_i_part partition (p='3') select key+20000, value from src;
+create table sih_src as select key, value from sih_i_part;
+create table sih_src2 as select key, value from sih_src;
+
+set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.VerifyIsLocalModeHook ;
+set mapred.job.tracker=does.notexist.com:666;
+set hive.exec.mode.local.auto.tasks.max=1;
+
+-- sample split, running locally limited by num tasks
+select count(1) from sih_src tablesample(1 percent);
+
+set mapred.job.tracker=does.notexist.com:666;
+
+-- sample two tables
+select count(1) from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key;
+
+set hive.exec.mode.local.auto.inputbytes.max=1000;
+set hive.exec.mode.local.auto.tasks.max=4;
+set mapred.job.tracker=does.notexist.com:666;
+
+-- sample split, running locally limited by max bytes
+select count(1) from sih_src tablesample(1 percent);
+
+drop table sih_i_part;
+drop table sih_src;
+drop table sih_src2;

Added: hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out?rev=1151175&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/sample_islocalmode_hook.q.out Tue Jul 26 17:13:29 2011
@@ -0,0 +1,116 @@
+PREHOOK: query: drop table if exists sih_i_part
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_i_part
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists sih_src
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_src
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists sih_src2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists sih_src2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: -- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- create file inputs
+create table sih_i_part (key int, value string) partitioned by (p string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@sih_i_part
+PREHOOK: query: insert overwrite table sih_i_part partition (p='1') select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=1
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='1') select key, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=1
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table sih_i_part partition (p='2') select key+10000, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=2
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='2') select key+10000, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=2
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: insert overwrite table sih_i_part partition (p='3') select key+20000, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@sih_i_part@p=3
+POSTHOOK: query: insert overwrite table sih_i_part partition (p='3') select key+20000, value from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@sih_i_part@p=3
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table sih_src as select key, value from sih_i_part
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@sih_i_part@p=1
+PREHOOK: Input: default@sih_i_part@p=2
+PREHOOK: Input: default@sih_i_part@p=3
+POSTHOOK: query: create table sih_src as select key, value from sih_i_part
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@sih_i_part@p=1
+POSTHOOK: Input: default@sih_i_part@p=2
+POSTHOOK: Input: default@sih_i_part@p=3
+POSTHOOK: Output: default@sih_src
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: create table sih_src2 as select key, value from sih_src
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@sih_src
+POSTHOOK: query: create table sih_src2 as select key, value from sih_src
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@sih_src
+POSTHOOK: Output: default@sih_src2
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: sih_i_part PARTITION(p=3).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: -- sample split, running locally limited by num tasks
+select count(1) from sih_src tablesample(1 percent)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-00_619_5856650519690274700/-mr-10000
+500
+PREHOOK: query: -- sample two tables
+select count(1) from sih_src tablesample(1 percent)a join sih_src2 tablesample(1 percent)b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Input: default@sih_src2
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-04_069_8883954538684297085/-mr-10000
+0
+PREHOOK: query: -- sample split, running locally limited by max bytes
+select count(1) from sih_src tablesample(1 percent)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: file:/data/users/kevinwilfong/trunk/VENDOR.hive/trunk/build/ql/scratchdir/hive_2011-07-22_10-31-10_389_1530285320067549321/-mr-10000
+500
+PREHOOK: query: drop table sih_i_part
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_i_part
+PREHOOK: Output: default@sih_i_part
+PREHOOK: query: drop table sih_src
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_src
+PREHOOK: Output: default@sih_src
+PREHOOK: query: drop table sih_src2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@sih_src2
+PREHOOK: Output: default@sih_src2