You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/07/29 04:41:19 UTC

svn commit: r980297 [1/16] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ data/conf/ metastore/src/test/org/apache/hadoop/hive/metastore/ ql/src/java/org/apache/hadoop/hive/...

Author: nzhang
Date: Thu Jul 29 02:41:14 2010
New Revision: 980297

URL: http://svn.apache.org/viewvc?rev=980297&view=rev
Log:
HIVE-1408. add option to let hive automatically run in local mode based on tunable heuristics (Joydeep Sen Sarma via Ning Zhang)

Added:
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/autolocal1.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/autolocal1.q.out
    hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/
    hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/ProxyFileSystem.java
    hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
    hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/
    hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/ProxyFileSystem.java
    hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
    hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/
    hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/ProxyFileSystem.java
    hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
    hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/
    hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java
    hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/build-common.xml
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/data/conf/hive-site.xml
    hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/ctas.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/input12.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/input39.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/insertexternal1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/join14.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/fs_default_name1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientnegative/fs_default_name2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/combine2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/filter_join_breaktask.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input39.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
    hadoop/hive/trunk/shims/build.xml

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jul 29 02:41:14 2010
@@ -19,6 +19,10 @@ Trunk -  Unreleased
     HIVE-1481. ngrams() UDAF for estimating top-k n-gram frequencies
     (Mayank Lahiri via jvs)
 
+    HIVE-1408. add option to let hive automatically run in local mode based on 
+    tunable heuristics
+    (Joydeep Sen Sarma via Ning Zhang)
+
   IMPROVEMENTS
 
     HIVE-1394. Do not update transient_lastDdlTime if the partition is modified by a housekeeping

Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Thu Jul 29 02:41:14 2010
@@ -392,6 +392,14 @@
 
   <target name="gen-test"/>
 
+  <!-- use pfile:/// as warehouse file system in 20 for non miniMR runs -->
+  <condition property="test.warehouse.scheme" value="pfile://" else="">
+    <not>
+      <equals arg1="${clustermode}" arg2="miniMR" />
+    </not>
+  </condition>
+
+
   <!-- target to run the tests -->
   <target name="test"
   	depends="test-conditions,gen-test,compile-test,test-jar,test-init">
@@ -417,7 +425,7 @@
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.silent" value="${test.silent}"/>
       <sysproperty key="test.tmp.dir" value="${build.dir}/tmp"/>
-      <sysproperty key="test.warehouse.dir" value="${build.dir}/test/data/warehouse"/>
+      <sysproperty key="test.warehouse.dir" value="${test.warehouse.scheme}${build.dir}/test/data/warehouse"/>
       <sysproperty key="build.dir" value="${build.dir}"/>
       <sysproperty key="build.dir.hive" value="${build.dir.hive}"/>
 

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Thu Jul 29 02:41:14 2010
@@ -82,7 +82,7 @@ public final class FileUtils {
       }
     }
 
-    return new Path(scheme + ":" + "//" + authority + pathUri.getPath());
+    return new Path(scheme, authority, pathUri.getPath());
   }
 
   private FileUtils() {

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jul 29 02:41:14 2010
@@ -93,6 +93,16 @@ public class HiveConf extends Configurat
     DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
     DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
 
+
+    // should hive determine whether to run in local mode automatically ?
+    LOCALMODEAUTO("hive.exec.mode.local.auto", true),
+    // if yes:
+    // run in local mode only if input bytes is less than this. 128MB by default
+    LOCALMODEMAXBYTES("hive.exec.mode.local.auto.inputbytes.max", 134217728L),
+    // run in local mode only if number of tasks (for map and reduce each) is
+    // less than this
+    LOCALMODEMAXTASKS("hive.exec.mode.local.auto.tasks.max", 4),
+
     // hadoop stuff
     HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),
     HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Jul 29 02:41:14 2010
@@ -607,6 +607,12 @@
   numbers, this conf var needs to be set manually.</description>
 </property>
 
+<property>
+  <name>hive.exec.mode.local.auto</name>
+  <value>true</value>
+  <description> Let hive determine whether to run in local mode automatically </description>
+</property>
+
 <!-- HBase Storage Handler Parameters -->
 
 <property>

Modified: hadoop/hive/trunk/data/conf/hive-site.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/conf/hive-site.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/data/conf/hive-site.xml (original)
+++ hadoop/hive/trunk/data/conf/hive-site.xml Thu Jul 29 02:41:14 2010
@@ -60,7 +60,7 @@
 <property>
   <!--  this should eventually be deprecated since the metastore should supply this -->
   <name>hive.metastore.warehouse.dir</name>
-  <value>file://${build.dir}/test/data/warehouse/</value>
+  <value>${test.warehouse.dir}</value>
   <description></description>
 </property>
 
@@ -145,4 +145,19 @@
   <description>Track progress of a task</description>
 </property>
 
+<property>
+  <name>fs.pfile.impl</name>
+  <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+  <description>A proxy for local file system used for cross file system testing</description>
+</property>
+
+<property>
+  <name>hive.exec.mode.local.auto</name>
+  <value>false</value>
+  <description>
+    Let hive determine whether to run in local mode automatically
+    Disabling this for tests so that minimr is not affected
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java (original)
+++ hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java Thu Jul 29 02:41:14 2010
@@ -303,8 +303,8 @@ public class TestHiveMetaStore extends T
       }
       assertTrue("Bad partition spec should have thrown an exception", exceptionThrown);
 
-      FileSystem fs = FileSystem.get(hiveConf);
       Path partPath = new Path(part2.getSd().getLocation());
+      FileSystem fs = FileSystem.get(partPath.toUri(), hiveConf);
 
       assertTrue(fs.exists(partPath));
       ret = client.dropPartition(dbName, tblName, part.getValues(), true);
@@ -683,7 +683,8 @@ public class TestHiveMetaStore extends T
           (tbl2.getPartitionKeys() == null)
               || (tbl2.getPartitionKeys().size() == 0));
 
-      FileSystem fs = FileSystem.get(hiveConf);
+      FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(),
+                                     hiveConf);
       client.dropTable(dbName, tblName);
       assertFalse(fs.exists(new Path(tbl.getSd().getLocation())));
 
@@ -775,7 +776,8 @@ public class TestHiveMetaStore extends T
       assertEquals("Alter table didn't succeed. Num buckets is different ",
           tbl2.getSd().getNumBuckets(), tbl3.getSd().getNumBuckets());
       // check that data has moved
-      FileSystem fs = FileSystem.get(hiveConf);
+      FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(),
+                                     hiveConf);
       assertFalse("old table location still exists", fs.exists(new Path(tbl
           .getSd().getLocation())));
       assertTrue("data did not move to new location", fs.exists(new Path(tbl3

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Thu Jul 29 02:41:14 2010
@@ -23,8 +23,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -34,9 +37,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Context for Semantic Analyzers. Usage: not reusable - construct a new one for
@@ -50,38 +55,27 @@ public class Context {
   private Path[] resDirPaths;
   private int resDirFilesNum;
   boolean initialized;
+  String originalTracker = null;
+  private HashMap<String, ContentSummary> pathToCS;
 
-  // all query specific directories are created as sub-directories of queryPath
-  // this applies to all non-local (ie. hdfs) file system tmp folders
-  private Path queryScratchPath;
+  // scratch path to use for all non-local (ie. hdfs) file system tmp folders
+  private final Path nonLocalScratchPath;
 
-
-  // Path without a file system
-  // Used for creating temporary directory on local file system
-  private String localScratchPath;
-
-
-  // Fully Qualified path on the local file system
-  // System.getProperty("java.io.tmpdir") + Path.SEPARATOR
-  // + System.getProperty("user.name") + Path.SEPARATOR + executionId
-  private Path localScratchDir;
-
-  // On the default FileSystem (usually HDFS):
-  // also based on hive.exec.scratchdir which by default is
-  // "/tmp/"+System.getProperty("user.name")+"/hive"
-  private Path MRScratchDir;
+  // scratch directory to use for local file system tmp folders
+  private final String localScratchDir;
 
   // Keeps track of scratch directories created for different scheme/authority
-  private final Map<String, Path> externalScratchDirs = new HashMap<String, Path>();
+  private final Map<String, String> fsScratchDirs = new HashMap<String, String>();
 
-  private HiveConf conf;
+
+  private Configuration conf;
   protected int pathid = 10000;
   protected boolean explain = false;
   private TokenRewriteStream tokenRewriteStream;
 
   String executionId;
 
-  public Context(HiveConf conf) throws IOException {
+  public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
   }
 
@@ -89,216 +83,195 @@ public class Context {
    * Create a Context with a given executionId.  ExecutionId, together with
    * user name and conf, will determine the temporary directory locations.
    */
-  public Context(HiveConf conf, String executionId) throws IOException {
+  public Context(Configuration conf, String executionId)  {
     this.conf = conf;
     this.executionId = executionId;
+    
+    // non-local tmp location is configurable. however it is the same across
+    // all external file systems
+    nonLocalScratchPath =
+      new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
+               executionId);
 
-    localScratchPath = System.getProperty("java.io.tmpdir")
+    // local tmp location is not configurable for now
+    localScratchDir = System.getProperty("java.io.tmpdir")
       + Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
       + executionId;
-
-    queryScratchPath = new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR), executionId);
   }
 
   /**
    * Set the context on whether the current query is an explain query.
-   *
-   * @param value
-   *          true if the query is an explain query, false if not
+   * @param value true if the query is an explain query, false if not
    */
   public void setExplain(boolean value) {
     explain = value;
   }
-
+  
   /**
-   * Find out whether the current query is an explain query.
-   *
+   * Find whether the current query is an explain query
    * @return true if the query is an explain query, false if not
    */
-  public boolean getExplain() {
+  public boolean getExplain () {
     return explain;
   }
 
-  /**
-   * Make a tmp directory for MR intermediate data If URI/Scheme are not
-   * supplied - those implied by the default filesystem will be used (which will
-   * typically correspond to hdfs instance on hadoop cluster).
-   *
-   * @param mkdir  if true, will make the directory. Will throw IOException if that fails.
-   */
-  private Path makeMRScratchDir(HiveConf conf, boolean mkdir)
-      throws IOException {
-
-    Path dir = FileUtils.makeQualified(queryScratchPath, conf);
-
-    if (mkdir) {
-      FileSystem fs = dir.getFileSystem(conf);
-      if (!fs.mkdirs(dir)) {
-        throw new IOException("Cannot make directory: " + dir);
-      }
-    }
-    return dir;
-  }
 
   /**
-   * Make a tmp directory on specified URI Currently will use the same path as
-   * implied by SCRATCHDIR config variable.
-   */
-  private Path makeExternalScratchDir(HiveConf conf, boolean mkdir, URI extURI)
-    throws IOException {
-
-    Path dir = new Path(extURI.getScheme(), extURI.getAuthority(),
-                        queryScratchPath.toUri().getPath());
-
-    if (mkdir) {
-      FileSystem fs = dir.getFileSystem(conf);
-      if (!fs.mkdirs(dir)) {
-        throw new IOException("Cannot make directory: " + dir);
-      }
-    }
-    return dir;
-  }
-
-  /**
-   * Make a tmp directory for local file system.
+   * Get a tmp directory on specified URI
    *
-   * @param mkdir  if true, will make the directory. Will throw IOException if that fails.
-   */
-  private Path makeLocalScratchDir(boolean mkdir)
-      throws IOException {
-
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path dir = fs.makeQualified(new Path(localScratchPath));
-
-    if (mkdir) {
-      if (!fs.mkdirs(dir)) {
-        throw new IOException("Cannot make directory: " + dir);
+   * @param scheme Scheme of the target FS 
+   * @param authority Authority of the target FS
+   * @param mkdir create the directory if true
+   * @param scratchdir path of tmp directory
+   */
+  private String getScratchDir(String scheme, String authority,
+                               boolean mkdir, String scratchDir) {
+
+    String fileSystem =  scheme + ":" + authority;
+    String dir = fsScratchDirs.get(fileSystem);
+
+    if (dir == null) {
+      Path dirPath = new Path(scheme, authority, scratchDir);
+      if (mkdir) {
+        try {
+          FileSystem fs = dirPath.getFileSystem(conf);
+          if (!fs.mkdirs(dirPath))
+            throw new RuntimeException("Cannot make directory: "
+                                       + dirPath.toString());
+        } catch (IOException e) {
+          throw new RuntimeException (e);
+        }
       }
+      dir = dirPath.toString();
+      fsScratchDirs.put(fileSystem, dir);
     }
     return dir;
   }
 
+
   /**
-   * Get a tmp directory on specified URI Will check if this has already been
-   * made (either via MR or Local FileSystem or some other external URI.
+   * Create a local scratch directory on demand and return it.
    */
-  private String getExternalScratchDir(URI extURI) {
+  public String getLocalScratchDir(boolean mkdir) {
     try {
-      String fileSystem = extURI.getScheme() + ":" + extURI.getAuthority();
-      Path dir = externalScratchDirs.get(fileSystem);
-      if (dir == null) {
-        dir = makeExternalScratchDir(conf, !explain, extURI);
-        externalScratchDirs.put(fileSystem, dir);
-      }
-      return dir.toString();
+      FileSystem fs = FileSystem.getLocal(conf);
+      URI uri = fs.getUri();
+      return getScratchDir(uri.getScheme(), uri.getAuthority(),
+                           mkdir, localScratchDir);
     } catch (IOException e) {
-      throw new RuntimeException(e);
+      throw new RuntimeException (e);
     }
   }
 
+
   /**
    * Create a map-reduce scratch directory on demand and return it.
+   * 
    */
   public String getMRScratchDir() {
-    try {
-      // if we are executing entirely on the client side - then
-      // just (re)use the local scratch directory
-      if(isLocalOnlyExecutionMode())
-        return getLocalScratchDir();
 
-      if (MRScratchDir == null) {
-        MRScratchDir = makeMRScratchDir(conf, !explain);
-      }
-      return MRScratchDir.toString();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (IllegalArgumentException e) {
-      throw new RuntimeException("Error while making MR scratch "
-          + "directory - check filesystem config (" + e.getCause() + ")", e);
-    }
-  }
+    // if we are executing entirely on the client side - then
+    // just (re)use the local scratch directory
+    if(isLocalOnlyExecutionMode())
+      return getLocalScratchDir(!explain);
 
-  /**
-   * Create a local scratch directory on demand and return it.
-   */
-  public String getLocalScratchDir() {
     try {
-      if (localScratchDir == null) {
-        localScratchDir = makeLocalScratchDir(true);
-      }
-      return localScratchDir.toString();
+      Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf);
+      URI uri = dir.toUri();
+      return getScratchDir(uri.getScheme(), uri.getAuthority(),
+                           !explain, uri.getPath());
+
     } catch (IOException e) {
       throw new RuntimeException(e);
     } catch (IllegalArgumentException e) {
-      throw new RuntimeException("Error while making local scratch "
+      throw new RuntimeException("Error while making MR scratch "
           + "directory - check filesystem config (" + e.getCause() + ")", e);
     }
   }
 
-  private void removeDir(Path p) {
-    try {
-      p.getFileSystem(conf).delete(p, true);
-    } catch (Exception e) {
-      LOG.warn("Error Removing Scratch: "
-          + StringUtils.stringifyException(e));
-    }
+  private String getExternalScratchDir(URI extURI) {
+    return getScratchDir(extURI.getScheme(), extURI.getAuthority(),
+                         !explain, nonLocalScratchPath.toUri().getPath());
   }
 
   /**
    * Remove any created scratch directories.
    */
   private void removeScratchDir() {
-
-    for (Map.Entry<String, Path> p : externalScratchDirs.entrySet()) {
-      removeDir(p.getValue());
-    }
-    externalScratchDirs.clear();
-
-    if (MRScratchDir != null) {
-      removeDir(MRScratchDir);
-      MRScratchDir = null;
-    }
-
-    if (localScratchDir != null) {
-      removeDir(localScratchDir);
-      localScratchDir = null;
+    for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) {
+      try {
+        Path p = new Path(entry.getValue());
+        p.getFileSystem(conf).delete(p, true);
+      } catch (Exception e) {
+        LOG.warn("Error Removing Scratch: "
+                 + StringUtils.stringifyException(e));
+      }
     }
+    fsScratchDirs.clear();
   }
 
-  /**
-   * Return the next available path in the current scratch dir.
-   */
-  private String nextPath(String base) {
-    return base + Path.SEPARATOR + Integer.toString(pathid++);
+  private String nextPathId() {
+    return Integer.toString(pathid++);
   }
 
+
+  private static final String MR_PREFIX = "-mr-";
+  private static final String EXT_PREFIX = "-ext-";
+  private static final String LOCAL_PREFIX = "-local-";
+
   /**
-   * Check if path is tmp path. the assumption is that all uri's relative to
-   * scratchdir are temporary.
-   *
+   * Check if path is for intermediate data
    * @return true if a uri is a temporary uri for map-reduce intermediate data,
    *         false otherwise
    */
   public boolean isMRTmpFileURI(String uriStr) {
-    return (uriStr.indexOf(executionId) != -1);
+    return (uriStr.indexOf(executionId) != -1) &&
+      (uriStr.indexOf(MR_PREFIX) != -1);
   }
 
   /**
    * Get a path to store map-reduce intermediate data in.
-   *
+   * 
    * @return next available path for map-red intermediate data
    */
   public String getMRTmpFileURI() {
-    return nextPath(getMRScratchDir());
+    return getMRScratchDir() + Path.SEPARATOR + MR_PREFIX +
+      nextPathId();
   }
 
+
+  /**
+   * Given a URI for mapreduce intermediate output, swizzle the 
+   * it to point to the local file system. This can be called in 
+   * case the caller decides to run in local mode (in which case
+   * all intermediate data can be stored locally)
+   *
+   * @param originalURI uri to localize
+   * @return localized path for map-red intermediate data
+   */
+  public String localizeMRTmpFileURI(String originalURI) {
+    Path o = new Path(originalURI);
+    Path mrbase = new Path(getMRScratchDir());
+
+    URI relURI = mrbase.toUri().relativize(o.toUri());
+    if (relURI.equals(o.toUri()))
+      throw new RuntimeException
+        ("Invalid URI: " + originalURI + ", cannot relativize against" +
+         mrbase.toString());
+
+    return getLocalScratchDir(!explain) + Path.SEPARATOR + 
+      relURI.getPath();
+  }
+
+
   /**
    * Get a tmp path on local host to store intermediate data.
    *
    * @return next available tmp path on local fs
    */
   public String getLocalTmpFileURI() {
-    return nextPath(getLocalScratchDir());
+    return getLocalScratchDir(true) + Path.SEPARATOR + LOCAL_PREFIX +
+      nextPathId();
   }
 
   /**
@@ -309,7 +282,8 @@ public class Context {
    * @return next available tmp path on the file system corresponding extURI
    */
   public String getExternalTmpFileURI(URI extURI) {
-    return nextPath(getExternalScratchDir(extURI));
+    return getExternalScratchDir(extURI) +  Path.SEPARATOR + EXT_PREFIX +
+      nextPathId();
   }
 
   /**
@@ -368,6 +342,7 @@ public class Context {
       }
     }
     removeScratchDir();
+    originalTracker = null;
   }
 
   public DataInput getStream() {
@@ -473,10 +448,6 @@ public class Context {
     return executionId;
   }
 
-  public Path getQueryPath() {
-    return queryScratchPath;
-  }
-
   /**
    * Does Hive wants to run tasks entirely on the local machine
    * (where the query is being compiled)?
@@ -484,6 +455,66 @@ public class Context {
    * Today this translates into running hadoop jobs locally
    */
   public boolean isLocalOnlyExecutionMode() {
-    return conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+    return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local");
+  }
+
+  public void setOriginalTracker(String originalTracker) {
+    this.originalTracker = originalTracker;
+  }
+
+  public void restoreOriginalTracker() {
+    if (originalTracker != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HADOOPJT, originalTracker);
+      originalTracker = null;
+    }
+  }
+
+  public void addCS(String path, ContentSummary cs) {
+    if(pathToCS == null)
+      pathToCS = new HashMap<String, ContentSummary> ();
+    pathToCS.put(path, cs);
+  }
+  
+  public ContentSummary getCS(String path) {
+    if(pathToCS == null)
+      pathToCS = new HashMap<String, ContentSummary> ();
+    return pathToCS.get(path);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+
+  /**
+   * Given a mapping from paths to objects, localize any MR tmp paths
+   * @param map mapping from paths to objects
+   */
+  public void localizeKeys(Map<String, Object> map) {
+    for (Map.Entry<String, Object> entry: map.entrySet()) {
+      String path = entry.getKey();
+      if (isMRTmpFileURI(path)) {
+        Object val = entry.getValue();
+        map.remove(path);
+        map.put(localizeMRTmpFileURI(path), val);
+      }
+    }
+  }
+
+  /**
+   * Given a list of paths, localize any MR tmp paths contained therein
+   * @param paths list of paths to be localized
+   */
+  public void localizePaths(List<String> paths) {
+    Iterator<String> iter = paths.iterator();
+    List<String> toAdd = new ArrayList<String> ();
+    while(iter.hasNext()) {
+      String path = iter.next();
+      if (isMRTmpFileURI(path)) {
+        iter.remove();
+        toAdd.add(localizeMRTmpFileURI(path));
+      }
+    }
+    paths.addAll(toAdd);
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Jul 29 02:41:14 2010
@@ -97,34 +97,6 @@ public class Driver implements CommandPr
     Operator.resetId();
   }
 
-  public int countJobs(List<Task<? extends Serializable>> tasks) {
-    return countJobs(tasks, new ArrayList<Task<? extends Serializable>>());
-  }
-
-  public int countJobs(List<Task<? extends Serializable>> tasks,
-      List<Task<? extends Serializable>> seenTasks) {
-    if (tasks == null) {
-      return 0;
-    }
-    int jobs = 0;
-    for (Task<? extends Serializable> task : tasks) {
-      if (!seenTasks.contains(task)) {
-        seenTasks.add(task);
-
-        if (task instanceof ConditionalTask) {
-          jobs += countJobs(((ConditionalTask) task).getListTasks(), seenTasks);
-        } else if (task.isMapRedTask()) { // this may be true for conditional
-                                          // task, but we will not inc the
-                                          // counter
-          jobs++;
-        }
-
-        jobs += countJobs(task.getChildTasks(), seenTasks);
-      }
-    }
-    return jobs;
-  }
-
   /**
    * Return the status information about the Map-Reduce cluster
    */
@@ -319,7 +291,7 @@ public class Driver implements CommandPr
       // test Only - serialize the query plan and deserialize it
       if("true".equalsIgnoreCase(System.getProperty("test.serialize.qplan"))) {
 
-        String queryPlanFileName = ctx.getLocalScratchDir() + Path.SEPARATOR_CHAR
+        String queryPlanFileName = ctx.getLocalScratchDir(true) + Path.SEPARATOR_CHAR
           + "queryplan.xml";
         LOG.info("query plan = " + queryPlanFileName);
         queryPlanFileName = new Path(queryPlanFileName).toUri().getPath();
@@ -468,7 +440,7 @@ public class Driver implements CommandPr
                 UnixUserGroupInformation.UGI_PROPERTY_NAME));
       }
 
-      int jobs = countJobs(plan.getRootTasks());
+      int jobs = Utilities.getMRTasks(plan.getRootTasks()).size();
       if (jobs > 0) {
         console.printInfo("Total MapReduce jobs = " + jobs);
       }
@@ -539,6 +511,10 @@ public class Driver implements CommandPr
         }
       }
 
+      // in case we decided to run everything in local mode, restore the
+      // the jobtracker setting to its initial value
+      ctx.restoreOriginalTracker();
+
       // Get all the post execution hooks and execute them.
       for (PostExecute peh : getPostExecHooks()) {
         peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java Thu Jul 29 02:41:14 2010
@@ -77,4 +77,8 @@ public class CollectOperator extends Ope
     }
   }
 
+  @Override
+  public int getType() {
+    return -1;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Thu Jul 29 02:41:14 2010
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
@@ -199,4 +200,11 @@ public class ConditionalTask extends Tas
     }
     return ret;
   }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    if (getListTasks() != null)
+      for(Task<? extends Serializable> t: getListTasks())
+        t.localizeMRTmpFiles(ctx);
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Thu Jul 29 02:41:14 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -97,4 +98,12 @@ public class CopyTask extends Task<CopyW
   public String getName() {
     return "COPY";
   }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    // copy task is only used by the load command and 
+    // does not use any map-reduce tmp files
+    // we don't expect to enter this code path at all
+    throw new RuntimeException ("Unexpected call");
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Thu Jul 29 02:41:14 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -424,8 +425,9 @@ public class DDLTask extends Task<DDLWor
     p.setLocation(parentDir);
   }
 
-  private boolean pathExists(FileSystem fs, Path p) throws HiveException {
+  private boolean pathExists(Path p) throws HiveException {
     try {
+      FileSystem fs = p.getFileSystem(conf);
       return fs.exists(p);
     } catch (IOException e) {
       throw new HiveException(e);
@@ -477,16 +479,13 @@ public class DDLTask extends Task<DDLWor
       Path originalDir = new Path(getOriginalLocation(p));
       Path leftOverIntermediateOriginal = new Path(originalDir.getParent(),
           originalDir.getName() + INTERMEDIATE_ORIGINAL_DIR_SUFFIX);
-      try {
-        if (pathExists(leftOverIntermediateOriginal.getFileSystem(conf),
-            leftOverIntermediateOriginal)) {
-          console.printInfo("Deleting " + leftOverIntermediateOriginal +
-              " left over from a previous archiving operation");
-          deleteDir(leftOverIntermediateOriginal);
-        }
-      } catch (IOException e) {
-        throw new HiveException(e);
+
+      if (pathExists(leftOverIntermediateOriginal)) {
+        console.printInfo("Deleting " + leftOverIntermediateOriginal +
+        " left over from a previous archiving operation");
+        deleteDir(leftOverIntermediateOriginal);
       }
+
       throw new HiveException("Specified partition is already archived");
     }
 
@@ -525,12 +524,12 @@ public class DDLTask extends Task<DDLWor
     // ARCHIVE_INTERMEDIATE_DIR_SUFFIX that's the same level as the partition,
     // if it does not already exist. If it does exist, we assume the dir is good
     // to use as the move operation that created it is atomic.
-    if (!pathExists(fs, intermediateArchivedDir) &&
-        !pathExists(fs, intermediateOriginalDir)) {
+    if (!pathExists(intermediateArchivedDir) &&
+        !pathExists(intermediateOriginalDir)) {
 
       // First create the archive in a tmp dir so that if the job fails, the
       // bad files don't pollute the filesystem
-      Path tmpDir = new Path(driverContext.getCtx().getMRScratchDir(), "partlevel");
+      Path tmpDir = new Path(driverContext.getCtx().getExternalTmpFileURI(originalDir.toUri()), "partlevel");
 
       console.printInfo("Creating " + archiveName + " for " + originalDir.toString());
       console.printInfo("in " + tmpDir);
@@ -551,7 +550,7 @@ public class DDLTask extends Task<DDLWor
       // the partition directory. e.g. .../hr=12-intermediate-archived
       try {
         console.printInfo("Moving " + tmpDir + " to " + intermediateArchivedDir);
-        if (pathExists(fs, intermediateArchivedDir)) {
+        if (pathExists(intermediateArchivedDir)) {
           throw new HiveException("The intermediate archive directory already exists.");
         }
         fs.rename(tmpDir, intermediateArchivedDir);
@@ -559,7 +558,7 @@ public class DDLTask extends Task<DDLWor
         throw new HiveException("Error while moving tmp directory");
       }
     } else {
-      if (pathExists(fs, intermediateArchivedDir)) {
+      if (pathExists(intermediateArchivedDir)) {
         console.printInfo("Intermediate archive directory " + intermediateArchivedDir +
         " already exists. Assuming it contains an archived version of the partition");
       }
@@ -571,7 +570,7 @@ public class DDLTask extends Task<DDLWor
 
     // Move the original parent directory to the intermediate original directory
     // if the move hasn't been made already
-    if (!pathExists(fs, intermediateOriginalDir)) {
+    if (!pathExists(intermediateOriginalDir)) {
       console.printInfo("Moving " + originalDir + " to " +
           intermediateOriginalDir);
       moveDir(fs, originalDir, intermediateOriginalDir);
@@ -587,7 +586,7 @@ public class DDLTask extends Task<DDLWor
     // recovery
 
     // Move the intermediate archived directory to the original parent directory
-    if (!pathExists(fs, originalDir)) {
+    if (!pathExists(originalDir)) {
       console.printInfo("Moving " + intermediateArchivedDir + " to " +
           originalDir);
       moveDir(fs, intermediateArchivedDir, originalDir);
@@ -663,15 +662,12 @@ public class DDLTask extends Task<DDLWor
       Path leftOverArchiveDir = new Path(location.getParent(),
           location.getName() + INTERMEDIATE_ARCHIVED_DIR_SUFFIX);
 
-      try {
-        if (pathExists(location.getFileSystem(conf), leftOverArchiveDir)) {
-          console.printInfo("Deleting " + leftOverArchiveDir + " left over " +
-          "from a previous unarchiving operation");
-          deleteDir(leftOverArchiveDir);
-        }
-      } catch (IOException e) {
-        throw new HiveException(e);
+      if (pathExists(leftOverArchiveDir)) {
+        console.printInfo("Deleting " + leftOverArchiveDir + " left over " +
+        "from a previous unarchiving operation");
+        deleteDir(leftOverArchiveDir);
       }
+
       throw new HiveException("Specified partition is not archived");
     }
 
@@ -682,7 +678,9 @@ public class DDLTask extends Task<DDLWor
     Path intermediateExtractedDir = new Path(originalLocation.getParent(),
         originalLocation.getName() + INTERMEDIATE_EXTRACTED_DIR_SUFFIX);
 
-    Path tmpDir = new Path(driverContext.getCtx().getMRScratchDir());
+    Path tmpDir = new Path(driverContext
+          .getCtx()
+          .getExternalTmpFileURI(originalLocation.toUri()));
 
     FileSystem fs = null;
     try {
@@ -727,8 +725,8 @@ public class DDLTask extends Task<DDLWor
     // 5. Change the metadata
     // 6. Delete the archived partition files in intermediate-archive
 
-    if (!pathExists(fs, intermediateExtractedDir) &&
-        !pathExists(fs, intermediateArchiveDir)) {
+    if (!pathExists(intermediateExtractedDir) &&
+        !pathExists(intermediateArchiveDir)) {
       try {
 
         // Copy the files out of the archive into the temporary directory
@@ -765,7 +763,7 @@ public class DDLTask extends Task<DDLWor
     // At this point, we know that the extracted files are in the intermediate
     // extracted dir, or in the the original directory.
 
-    if (!pathExists(fs, intermediateArchiveDir)) {
+    if (!pathExists(intermediateArchiveDir)) {
       try {
         console.printInfo("Moving " + originalLocation + " to " + intermediateArchiveDir);
         fs.rename(originalLocation, intermediateArchiveDir);
@@ -783,7 +781,7 @@ public class DDLTask extends Task<DDLWor
     // If the original location exists here, then it must be the extracted files
     // because in the previous step, we moved the previous original location
     // (containing the archived version of the files) to intermediateArchiveDir
-    if (!pathExists(fs, originalLocation)) {
+    if (!pathExists(originalLocation)) {
       try {
         console.printInfo("Moving " + intermediateExtractedDir + " to " + originalLocation);
         fs.rename(intermediateExtractedDir, originalLocation);
@@ -2124,4 +2122,8 @@ public class DDLTask extends Task<DDLWor
     return "DDL";
   }
 
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    // no-op
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jul 29 02:41:14 2010
@@ -45,13 +45,13 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
@@ -60,7 +60,10 @@ import org.apache.hadoop.hive.ql.history
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -96,8 +99,6 @@ public class ExecDriver extends Task<Map
   protected transient int reduceProgress = 0;
   protected transient boolean success = false; // if job execution is successful
 
-  public static Random randGen = new Random();
-
   /**
    * Constructor when invoked from QL.
    */
@@ -105,7 +106,7 @@ public class ExecDriver extends Task<Map
     super();
   }
 
-  public static String getResourceFiles(Configuration conf,
+  protected static String getResourceFiles(Configuration conf,
       SessionState.ResourceType t) {
     // fill in local files to be added to the task environment
     SessionState ss = SessionState.get();
@@ -178,7 +179,7 @@ public class ExecDriver extends Task<Map
    * used to kill all running jobs in the event of an unexpected shutdown -
    * i.e., the JVM shuts down while there are still jobs running.
    */
-  public static Map<String, String> runningJobKillURIs =
+  private static Map<String, String> runningJobKillURIs =
       Collections.synchronizedMap(new HashMap<String, String>());
 
   /**
@@ -222,7 +223,7 @@ public class ExecDriver extends Task<Map
   /**
    * from StreamJob.java.
    */
-  public void jobInfo(RunningJob rj) {
+  private void jobInfo(RunningJob rj) {
     if (job.get("mapred.job.tracker", "local").equals("local")) {
       console.printInfo("Job running in-process (local Hadoop)");
     } else {
@@ -245,7 +246,7 @@ public class ExecDriver extends Task<Map
    * return this handle from execute and Driver can split execute into start,
    * monitorProgess and postProcess.
    */
-  public static class ExecDriverTaskHandle extends TaskHandle {
+  private static class ExecDriverTaskHandle extends TaskHandle {
     JobClient jc;
     RunningJob rj;
 
@@ -284,8 +285,7 @@ public class ExecDriver extends Task<Map
    * @return true if fatal errors happened during job execution, false
    *         otherwise.
    */
-  protected boolean checkFatalErrors(TaskHandle t, StringBuilder errMsg) {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
+  private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) {
     RunningJob rj = th.getRunningJob();
     try {
       Counters ctrs = th.getCounters();
@@ -311,9 +311,7 @@ public class ExecDriver extends Task<Map
     }
   }
 
-  @Override
-  public void progress(TaskHandle taskHandle) throws IOException {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle) taskHandle;
+  private void progress(ExecDriverTaskHandle th) throws IOException {
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     String lastReport = "";
@@ -404,101 +402,9 @@ public class ExecDriver extends Task<Map
   }
 
   /**
-   * Estimate the number of reducers needed for this job, based on job input,
-   * and configuration parameters.
-   *
-   * @return the number of reducers.
-   */
-  public int estimateNumberOfReducers(HiveConf hive, JobConf job,
-      MapredWork work) throws IOException {
-    if (hive == null) {
-      hive = new HiveConf();
-    }
-    long bytesPerReducer = hive.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
-    int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-    long totalInputFileSize = getTotalInputFileSize(job, work);
-
-    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-
-    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
-    reducers = Math.max(1, reducers);
-    reducers = Math.min(maxReducers, reducers);
-    return reducers;
-  }
-
-  /**
-   * Set the number of reducers for the mapred work.
-   */
-  protected void setNumberOfReducers() throws IOException {
-    // this is a temporary hack to fix things that are not fixed in the compiler
-    Integer numReducersFromWork = work.getNumReduceTasks();
-
-    if (work.getReducer() == null) {
-      console
-          .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
-      work.setNumReduceTasks(Integer.valueOf(0));
-    } else {
-      if (numReducersFromWork >= 0) {
-        console.printInfo("Number of reduce tasks determined at compile time: "
-            + work.getNumReduceTasks());
-      } else if (job.getNumReduceTasks() > 0) {
-        int reducers = job.getNumReduceTasks();
-        work.setNumReduceTasks(reducers);
-        console
-            .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
-            + reducers);
-      } else {
-        int reducers = estimateNumberOfReducers(conf, job, work);
-        work.setNumReduceTasks(reducers);
-        console
-            .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
-            + reducers);
-
-      }
-      console
-          .printInfo("In order to change the average load for a reducer (in bytes):");
-      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
-          + "=<number>");
-      console.printInfo("In order to limit the maximum number of reducers:");
-      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname
-          + "=<number>");
-      console.printInfo("In order to set a constant number of reducers:");
-      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
-          + "=<number>");
-    }
-  }
-
-  /**
-   * Calculate the total size of input files.
-   *
-   * @param job
-   *          the hadoop job conf.
-   * @return the total size in bytes.
-   * @throws IOException
-   */
-  public long getTotalInputFileSize(JobConf job, MapredWork work) throws IOException {
-    long r = 0;
-    // For each input path, calculate the total size.
-    for (String path : work.getPathToAliases().keySet()) {
-      try {
-        Path p = new Path(path);
-        FileSystem fs = p.getFileSystem(job);
-        ContentSummary cs = fs.getContentSummary(p);
-        r += cs.getLength();
-      } catch (IOException e) {
-        LOG.info("Cannot get size of " + path + ". Safely ignored.");
-      }
-    }
-    return r;
-  }
-
-  /**
    * Update counters relevant to this task.
    */
-  @Override
-  public void updateCounters(TaskHandle t) throws IOException {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
+  private void updateCounters(ExecDriverTaskHandle th) throws IOException {
     RunningJob rj = th.getRunningJob();
     mapProgress = Math.round(rj.mapProgress() * 100);
     reduceProgress = Math.round(rj.reduceProgress() * 100);
@@ -543,49 +449,31 @@ public class ExecDriver extends Task<Map
 
     success = true;
 
-    try {
-      setNumberOfReducers();
-    } catch (IOException e) {
-      String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
-          + e.getMessage();
-      console.printError(statusMesg, "\n"
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-      return 1;
-    }
-
     String invalidReason = work.isInvalid();
     if (invalidReason != null) {
       throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
     }
 
-    String hiveScratchDir;
-    if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) {
-      hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
-    } else {
-      hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
-    }
+    Context ctx = driverContext.getCtx();
+    boolean ctxCreated = false;
+    String emptyScratchDirStr;
+    Path emptyScratchDir;
 
-    String emptyScratchDirStr = null;
-    Path emptyScratchDir = null;
+    try {
+      if (ctx == null) {
+        ctx = new Context(job);
+        ctxCreated = true;
+      }
 
-    int numTries = 3;
-    while (numTries > 0) {
-      emptyScratchDirStr = hiveScratchDir + File.separator
-          + Utilities.randGen.nextInt();
+      emptyScratchDirStr = ctx.getMRTmpFileURI();
       emptyScratchDir = new Path(emptyScratchDirStr);
-
-      try {
-        FileSystem fs = emptyScratchDir.getFileSystem(job);
-        fs.mkdirs(emptyScratchDir);
-        break;
-      } catch (Exception e) {
-        if (numTries > 0) {
-          numTries--;
-        } else {
-          throw new RuntimeException("Failed to make dir "
-              + emptyScratchDir.toString() + " : " + e.getMessage());
-        }
-      }
+      FileSystem fs = emptyScratchDir.getFileSystem(job);
+      fs.mkdirs(emptyScratchDir);
+    } catch (IOException e) {
+      e.printStackTrace();
+      console.printError("Error launching map-reduce job", "\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      return 5;
     }
 
     ShimLoader.getHadoopShims().setNullOutputFormat(job);
@@ -674,13 +562,13 @@ public class ExecDriver extends Task<Map
     if (noName) {
       // This is for a special case to ensure unit tests pass
       HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB"
-          + randGen.nextInt());
+          + Utilities.randGen.nextInt());
     }
 
     try {
       addInputPaths(job, work, emptyScratchDirStr);
 
-      Utilities.setMapRedWork(job, work, hiveScratchDir);
+      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs
@@ -699,19 +587,17 @@ public class ExecDriver extends Task<Map
         HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
       }
 
-      // add to list of running jobs so in case of abnormal shutdown can kill
-      // it.
+      // add to list of running jobs to kill in case of abnormal shutdown
       runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
           + "&action=kill");
 
-      TaskHandle th = new ExecDriverTaskHandle(jc, rj);
+      ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
       jobInfo(rj);
       progress(th); // success status will be setup inside progress
 
       if (rj == null) {
         // in the corner case where the running job has disappeared from JT
-        // memory
-        // remember that we did actually submit the job.
+        // memory remember that we did actually submit the job.
         rj = orig_rj;
         success = false;
       }
@@ -743,7 +629,9 @@ public class ExecDriver extends Task<Map
     } finally {
       Utilities.clearMapRedWork(job);
       try {
-        emptyScratchDir.getFileSystem(job).delete(emptyScratchDir, true);
+        if(ctxCreated)
+          ctx.clear();
+
         if (returnVal != 0 && rj != null) {
           rj.killJob();
         }
@@ -796,7 +684,7 @@ public class ExecDriver extends Task<Map
    * @param jobId
    * @return
    */
-  public static String getJobStartMsg(String jobId) {
+  private static String getJobStartMsg(String jobId) {
     return "Starting Job = " + jobId;
   }
 
@@ -1081,16 +969,10 @@ public class ExecDriver extends Task<Map
     // log the list of job conf parameters for reference
     LOG.info(sb.toString());
 
-    URI pathURI = (new Path(planFileName)).toUri();
-    InputStream pathData;
-    if (StringUtils.isEmpty(pathURI.getScheme())) {
-      // default to local file system
-      pathData = new FileInputStream(planFileName);
-    } else {
-      // otherwise may be in hadoop ..
-      FileSystem fs = FileSystem.get(conf);
-      pathData = fs.open(new Path(planFileName));
-    }
+    // the plan file should always be in local directory
+    Path p = new Path(planFileName);
+    FileSystem fs = FileSystem.getLocal(conf);
+    InputStream pathData = fs.open(p);
 
     // this is workaround for hadoop-17 - libjars are not added to classpath of the
     // child process. so we add it here explicitly
@@ -1177,13 +1059,13 @@ public class ExecDriver extends Task<Map
         sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
             + Utilities.randGen.nextInt(), "UTF-8"));
       }
-
+      
       return sb.toString();
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
   }
-
+  
   @Override
   public boolean isMapRedTask() {
     return true;
@@ -1195,19 +1077,6 @@ public class ExecDriver extends Task<Map
     return w.getReducer() != null;
   }
 
-  private boolean isEmptyPath(JobConf job, String path) throws Exception {
-    Path dirPath = new Path(path);
-    FileSystem inpFs = dirPath.getFileSystem(job);
-
-    if (inpFs.exists(dirPath)) {
-      FileStatus[] fStats = inpFs.listStatus(dirPath);
-      if (fStats.length > 0) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   /**
    * Handle a empty/null path for a given alias.
    */
@@ -1309,7 +1178,7 @@ public class ExecDriver extends Task<Map
 
           LOG.info("Adding input file " + path);
 
-          if (!isEmptyPath(job, path)) {
+          if (!Utilities.isEmptyPath(job, path)) {
             FileInputFormat.addInputPaths(job, path);
           } else {
             emptyPaths.add(path);
@@ -1345,6 +1214,53 @@ public class ExecDriver extends Task<Map
 
   @Override
   public String getName() {
-    return "EXEC";
+    return "MAPRED";
+  }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+
+    // localize any map-reduce input paths
+    ctx.localizeKeys((Map<String, Object>)((Object)work.getPathToAliases()));
+    ctx.localizeKeys((Map<String, Object>)((Object)work.getPathToPartitionInfo()));
+
+    // localize any input paths for maplocal work
+    MapredLocalWork l = work.getMapLocalWork();
+    if (l != null) {
+      Map<String, FetchWork> m = l.getAliasToFetchWork();
+      if (m != null) {
+        for (FetchWork fw: m.values()) {
+          String s = fw.getTblDir();
+          if ((s != null) && ctx.isMRTmpFileURI(s))
+            fw.setTblDir(ctx.localizeMRTmpFileURI(s));
+        }
+      }
+    }
+
+    // fix up outputs
+    Map<String, ArrayList<String>> pa = work.getPathToAliases();
+    if (pa != null) {
+      for (List<String> ls: pa.values())
+        for (String a: ls) {
+          ArrayList<Operator<? extends Serializable>> opList = new
+            ArrayList<Operator<? extends Serializable>> ();
+          opList.add(work.getAliasToWork().get(a));
+          
+          while (!opList.isEmpty()) {
+            Operator<? extends Serializable> op = opList.remove(0);
+
+            if (op instanceof FileSinkOperator) {
+              FileSinkDesc fdesc = ((FileSinkOperator)op).getConf();
+              String s = fdesc.getDirName();
+              if ((s != null) && ctx.isMRTmpFileURI(s))
+                fdesc.setDirName(ctx.localizeMRTmpFileURI(s));
+              ((FileSinkOperator)op).setConf(fdesc);
+            }
+
+            if (op.getChildOperators() != null)
+              opList.addAll(op.getChildOperators());
+          }
+        }
+    }
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Thu Jul 29 02:41:14 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
 
 
 /**
@@ -402,4 +403,12 @@ public class ExplainTask extends Task<Ex
   public String getName() {
     return "EXPLAIN";
   }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    // explain task has nothing to localize 
+    // we don't expect to enter this code path at all
+    throw new RuntimeException ("Unexpected call");
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Jul 29 02:41:14 2010
@@ -282,9 +282,13 @@ public class FetchOperator implements Se
       splitNum = 0;
       serde = tmp.getDeserializerClass().newInstance();
       serde.initialize(job, tmp.getProperties());
-      LOG.debug("Creating fetchTask with deserializer typeinfo: "
-          + serde.getObjectInspector().getTypeName());
-      LOG.debug("deserializer properties: " + tmp.getProperties());
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Creating fetchTask with deserializer typeinfo: "
+                  + serde.getObjectInspector().getTypeName());
+        LOG.debug("deserializer properties: " + tmp.getProperties());
+      }
+
       if (currPart != null) {
         setPrtnDesc();
       }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Thu Jul 29 02:41:14 2010
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Properties;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -147,4 +148,15 @@ public class FetchTask extends Task<Fetc
   public String getName() {
     return "FETCH";
   }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    String s = work.getTblDir();
+    if ((s != null) && ctx.isMRTmpFileURI(s))
+      work.setTblDir(ctx.localizeMRTmpFileURI(s));
+
+    ArrayList<String> ls = work.getPartDir();
+    if (ls != null) 
+      ctx.localizePaths(ls);
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Jul 29 02:41:14 2010
@@ -732,7 +732,8 @@ public final class FunctionRegistry {
   }
 
   public static GenericUDAFResolver getGenericUDAFResolver(String functionName) {
-    LOG.debug("Looking up GenericUDAF: " + functionName);
+    if (LOG.isDebugEnabled())
+      LOG.debug("Looking up GenericUDAF: " + functionName);
     FunctionInfo finfo = mFunctions.get(functionName.toLowerCase());
     if (finfo == null) {
       return null;
@@ -870,9 +871,10 @@ public final class FunctionRegistry {
           conversionCost += cost;
         }
       }
-      LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = "
-          + argumentsPassed + " accepted = " + argumentsAccepted + " method = "
-          + m);
+      if (LOG.isDebugEnabled())
+        LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = "
+                  + argumentsPassed + " accepted = " + argumentsAccepted +
+                  " method = " + m);
       if (match) {
         // Always choose the function with least implicit conversions.
         if (conversionCost < leastConversionCost) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Thu Jul 29 02:41:14 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
@@ -127,4 +128,9 @@ public class FunctionTask extends Task<F
   public String getName() {
     return "FUNCTION";
   }
-}
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    throw new RuntimeException ("Unexpected call");
+  }
+}
\ No newline at end of file

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Jul 29 02:41:14 2010
@@ -427,4 +427,10 @@ public class MapOperator extends Operato
   public String getName() {
     return "MAP";
   }
+
+  @Override
+  public int getType() {
+    return -1;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jul 29 02:41:14 2010
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.File;
-import java.io.FileOutputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -28,19 +28,28 @@ import java.util.Properties;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
- * Alternate implementation (to ExecDriver) of spawning a mapreduce task that
- * runs it from a separate jvm. The primary issue with this is the inability to
- * control logging from a separate jvm in a consistent manner
+ * Extension of ExecDriver:
+ * - can optionally spawn a map-reduce task from a separate jvm
+ * - will make last minute adjustments to map-reduce job parameters, viz:
+ *   * estimating number of reducers
+ *   * estimating whether job should run locally
  **/
-public class MapRedTask extends Task<MapredWork> implements Serializable {
+public class MapRedTask extends ExecDriver implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
@@ -48,21 +57,75 @@ public class MapRedTask extends Task<Map
   static final String HADOOP_OPTS_KEY = "HADOOP_OPTS";
   static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
 
+  private transient ContentSummary inputSummary = null;
+  private transient boolean runningViaChild = false;
+
   public MapRedTask() {
     super();
   }
 
+  public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
+    throw new RuntimeException("Illegal Constructor call");
+  }
+
   @Override
   public int execute(DriverContext driverContext) {
 
+    Context ctx = driverContext.getCtx();
+    boolean ctxCreated = false;
+
     try {
+      if (ctx == null) {
+        ctx = new Context(conf);
+        ctxCreated = true;
+      }
+
+      // estimate number of reducers
+      setNumberOfReducers();
+
+      // auto-determine local mode if allowed
+      if (!ctx.isLocalOnlyExecutionMode() &&
+          conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
+
+        if (inputSummary == null)
+          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+
+        // at this point the number of reducers is precisely defined in the plan
+        int numReducers = work.getNumReduceTasks();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Task: " + getId() + ", Summary: " + 
+                    inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+                    + numReducers);
+        }
+
+	String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers);
+        if (reason == null) {
+	  // set the JT to local for the duration of this job
+          ctx.setOriginalTracker(conf.getVar(HiveConf.ConfVars.HADOOPJT));
+          conf.setVar(HiveConf.ConfVars.HADOOPJT, "local");
+          console.printInfo("Selecting local mode for task: " + getId());
+        } else {
+          console.printInfo("Cannot run job locally: " + reason);
+	}
+      }
+
+      runningViaChild =
+        "local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) ||
+        conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
+
+      if(!runningViaChild) {
+        // we are not running this mapred task via child jvm
+        // so directly invoke ExecDriver
+        return super.execute(driverContext);
+      }
+
       // enable assertion
       String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
       String hiveJar = conf.getJar();
 
       String libJarsOption;
-      String addedJars = ExecDriver.getResourceFiles(conf,
-          SessionState.ResourceType.JAR);
+      String addedJars = getResourceFiles(conf, SessionState.ResourceType.JAR);
       conf.setVar(ConfVars.HIVEADDEDJARS, addedJars);
       String auxJars = conf.getAuxJars();
       // Put auxjars and addedjars together into libjars
@@ -80,40 +143,13 @@ public class MapRedTask extends Task<Map
         }
       }
       // Generate the hiveConfArgs after potentially adding the jars
-      String hiveConfArgs = ExecDriver.generateCmdLine(conf);
-      String hiveScratchDir;
-      if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null)
-        hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
-      else
-        hiveScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR);
-
-      File scratchDir = new File(hiveScratchDir);
-
-      // Check if the scratch directory exists. If not, create it.
-      if (!scratchDir.exists()) {
-        LOG.info("Local scratch directory " + scratchDir.getPath()
-                                + " not found. Attempting to create.");
-        if (!scratchDir.mkdirs()) {
-          // Unable to create this directory - it might have been created due
-          // to another process.
-          if (!scratchDir.exists()) {
-            throw new TaskExecutionException(
-                "Cannot create scratch directory "
-                + "\"" +  scratchDir.getPath() + "\". "
-                + "To configure a different directory, "
-                + "set the configuration "
-                + "\"hive.exec.scratchdir\" "
-                + "in the session, or permanently by modifying the "
-                + "appropriate hive configuration file such as hive-site.xml.");
-          }
-        }
-      }
+      String hiveConfArgs = generateCmdLine(conf);
 
+      // write out the plan to a local file
+      Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
+      OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredWork plan = getWork();
-
-      File planFile = File.createTempFile("plan", ".xml", scratchDir);
-      LOG.info("Generating plan file " + planFile.toString());
-      FileOutputStream out = new FileOutputStream(planFile);
+      LOG.info("Generating plan file " + planPath.toString());
       Utilities.serializeMapRedWork(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System
@@ -127,10 +163,9 @@ public class MapRedTask extends Task<Map
       }
 
       String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
-          + planFile.toString() + " " + isSilent + " " + hiveConfArgs;
+          + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
 
-      String files = ExecDriver.getResourceFiles(conf,
-          SessionState.ResourceType.FILE);
+      String files = getResourceFiles(conf, SessionState.ResourceType.FILE);
       if (!files.isEmpty()) {
         cmdLine = cmdLine + " -files " + files;
       }
@@ -196,27 +231,148 @@ public class MapRedTask extends Task<Map
       e.printStackTrace();
       LOG.error("Exception: " + e.getMessage());
       return (1);
+    } finally {
+      try {
+        // in case we decided to run everything in local mode, restore the
+        // the jobtracker setting to its initial value
+        ctx.restoreOriginalTracker();
+
+        // creating the context can create a bunch of files. So make
+        // sure to clear it out
+        if(ctxCreated) 
+          ctx.clear();
+
+      } catch (Exception e) {
+        LOG.error("Exception: " + e.getMessage());
+      }
     }
   }
 
   @Override
-  public boolean isMapRedTask() {
-    return true;
+  public boolean mapStarted() {
+    boolean b = super.mapStarted();
+    return runningViaChild ? isdone : b;
   }
 
   @Override
-  public boolean hasReduce() {
-    MapredWork w = getWork();
-    return w.getReducer() != null;
+  public boolean reduceStarted() {
+    boolean b = super.reduceStarted();
+    return runningViaChild ? isdone : b;
   }
 
   @Override
-  public int getType() {
-    return StageType.MAPREDLOCAL;
+  public boolean mapDone() {
+    boolean b = super.mapDone();
+    return runningViaChild ? isdone : b;
   }
 
   @Override
-  public String getName() {
-    return "MAPRED";
+  public boolean reduceDone() {
+    boolean b = super.reduceDone();
+    return runningViaChild ? isdone : b;
+  }
+
+  /**
+   * Set the number of reducers for the mapred work.
+   */
+  private void setNumberOfReducers() throws IOException {
+    // this is a temporary hack to fix things that are not fixed in the compiler
+    Integer numReducersFromWork = work.getNumReduceTasks();
+
+    if (work.getReducer() == null) {
+      console
+          .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+      work.setNumReduceTasks(Integer.valueOf(0));
+    } else {
+      if (numReducersFromWork >= 0) {
+        console.printInfo("Number of reduce tasks determined at compile time: "
+            + work.getNumReduceTasks());
+      } else if (job.getNumReduceTasks() > 0) {
+        int reducers = job.getNumReduceTasks();
+        work.setNumReduceTasks(reducers);
+        console
+            .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+            + reducers);
+      } else {
+        int reducers = estimateNumberOfReducers();
+        work.setNumReduceTasks(reducers);
+        console
+            .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+            + reducers);
+
+      }
+      console
+          .printInfo("In order to change the average load for a reducer (in bytes):");
+      console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
+          + "=<number>");
+      console.printInfo("In order to limit the maximum number of reducers:");
+      console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname
+          + "=<number>");
+      console.printInfo("In order to set a constant number of reducers:");
+      console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
+          + "=<number>");
+    }
+  }
+
+  /**
+   * Estimate the number of reducers needed for this job, based on job input,
+   * and configuration parameters.
+   *
+   * @return the number of reducers.
+   */
+  private int estimateNumberOfReducers() throws IOException {
+    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+    if(inputSummary == null)
+      // compute the summary and stash it away
+      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work, null);
+
+    long totalInputFileSize = inputSummary.getLength();
+
+    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+
+    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+    reducers = Math.max(1, reducers);
+    reducers = Math.min(maxReducers, reducers);
+    return reducers;
+  }
+
+  /**
+   * Find out if a job can be run in local mode based on it's characteristics
+   *
+   * @param conf Hive Configuration
+   * @param inputSummary summary about the input files for this job
+   * @param numReducers total number of reducers for this job
+   * @return String null if job is eligible for local mode, reason otherwise
+   */
+  public static String isEligibleForLocalMode(HiveConf conf,
+                                               ContentSummary inputSummary,
+                                               int numReducers) {
+
+    long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
+    long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);
+
+    // check for max input size
+    if (inputSummary.getLength() > maxBytes)
+	return "Input Size (= " + maxBytes + ") is larger than " +
+	    HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
+
+    // ideally we would like to do this check based on the number of splits
+    // in the absence of an easy way to get the number of splits - do this
+    // based on the total number of files (pessimistically assumming that
+    // splits are equal to number of files in worst case)
+    if (inputSummary.getFileCount() > maxTasks)
+	return "Number of Input Files (= " + inputSummary.getFileCount() +
+	    ") is larger than " + 
+	    HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
+
+    // since local mode only runs with 1 reducers - make sure that the
+    // the number of reducers (set by user or inferred) is <=1
+    if (numReducers > 1) 
+	return "Number of reducers (= " + numReducers + ") is more than 1";
+
+    return null;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jul 29 02:41:14 2010
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.Lo
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -277,4 +278,10 @@ public class MoveTask extends Task<MoveW
   public String getName() {
     return "MOVE";
   }
+
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+    // no-op
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Jul 29 02:41:14 2010
@@ -1129,15 +1129,12 @@ public abstract class Operator<T extends
   }
 
   /**
-   * Should be overridden to return the type of the specific operator among the
+   * Return the type of the specific operator among the
    * types in OperatorType.
    *
-   * @return OperatorType.* or -1 if not overridden
+   * @return OperatorType.*
    */
-  public int getType() {
-    assert false;
-    return -1;
-  }
+  abstract public int getType();
 
   public void setGroupKeyObject(Object keyObject) {
     this.groupKeyObject = keyObject;