You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/07/29 04:41:19 UTC
svn commit: r980297 [1/16] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/conf/
metastore/src/test/org/apache/hadoop/hive/metastore/
ql/src/java/org/apache/hadoop/hive/...
Author: nzhang
Date: Thu Jul 29 02:41:14 2010
New Revision: 980297
URL: http://svn.apache.org/viewvc?rev=980297&view=rev
Log:
HIVE-1408. add option to let hive automatically run in local mode based on tunable heuristics (Joydeep Sen Sarma via Ning Zhang)
Added:
hadoop/hive/trunk/ql/src/test/queries/clientnegative/autolocal1.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/autolocal1.q.out
hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/
hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/ProxyFileSystem.java
hadoop/hive/trunk/shims/src/0.17/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/
hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/ProxyFileSystem.java
hadoop/hive/trunk/shims/src/0.18/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/
hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/ProxyFileSystem.java
hadoop/hive/trunk/shims/src/0.19/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/
hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java
hadoop/hive/trunk/shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/build-common.xml
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/data/conf/hive-site.xml
hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/ctas.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/input12.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/input39.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/insertexternal1.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/join14.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/fs_default_name1.q.out
hadoop/hive/trunk/ql/src/test/results/clientnegative/fs_default_name2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/combine2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/filter_join_breaktask.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input39.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
hadoop/hive/trunk/shims/build.xml
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Jul 29 02:41:14 2010
@@ -19,6 +19,10 @@ Trunk - Unreleased
HIVE-1481. ngrams() UDAF for estimating top-k n-gram frequencies
(Mayank Lahiri via jvs)
+ HIVE-1408. add option to let hive automatically run in local mode based on
+ tunable heuristics
+ (Joydeep Sen Sarma via Ning Zhang)
+
IMPROVEMENTS
HIVE-1394. Do not update transient_lastDdlTime if the partition is modified by a housekeeping
Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Thu Jul 29 02:41:14 2010
@@ -392,6 +392,14 @@
<target name="gen-test"/>
+ <!-- use pfile:/// as warehouse file system in 20 for non miniMR runs -->
+ <condition property="test.warehouse.scheme" value="pfile://" else="">
+ <not>
+ <equals arg1="${clustermode}" arg2="miniMR" />
+ </not>
+ </condition>
+
+
<!-- target to run the tests -->
<target name="test"
depends="test-conditions,gen-test,compile-test,test-jar,test-init">
@@ -417,7 +425,7 @@
<sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
<sysproperty key="test.silent" value="${test.silent}"/>
<sysproperty key="test.tmp.dir" value="${build.dir}/tmp"/>
- <sysproperty key="test.warehouse.dir" value="${build.dir}/test/data/warehouse"/>
+ <sysproperty key="test.warehouse.dir" value="${test.warehouse.scheme}${build.dir}/test/data/warehouse"/>
<sysproperty key="build.dir" value="${build.dir}"/>
<sysproperty key="build.dir.hive" value="${build.dir.hive}"/>
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Thu Jul 29 02:41:14 2010
@@ -82,7 +82,7 @@ public final class FileUtils {
}
}
- return new Path(scheme + ":" + "//" + authority + pathUri.getPath());
+ return new Path(scheme, authority, pathUri.getPath());
}
private FileUtils() {
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jul 29 02:41:14 2010
@@ -93,6 +93,16 @@ public class HiveConf extends Configurat
DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
+
+ // should hive determine whether to run in local mode automatically ?
+ LOCALMODEAUTO("hive.exec.mode.local.auto", true),
+ // if yes:
+ // run in local mode only if input bytes is less than this. 128MB by default
+ LOCALMODEMAXBYTES("hive.exec.mode.local.auto.inputbytes.max", 134217728L),
+ // run in local mode only if number of tasks (for map and reduce each) is
+ // less than this
+ LOCALMODEMAXTASKS("hive.exec.mode.local.auto.tasks.max", 4),
+
// hadoop stuff
HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),
HADOOPCONF("hadoop.config.dir", System.getenv("HADOOP_HOME") + "/conf"),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Jul 29 02:41:14 2010
@@ -607,6 +607,12 @@
numbers, this conf var needs to be set manually.</description>
</property>
+<property>
+ <name>hive.exec.mode.local.auto</name>
+ <value>true</value>
+ <description> Let hive determine whether to run in local mode automatically </description>
+</property>
+
<!-- HBase Storage Handler Parameters -->
<property>
Modified: hadoop/hive/trunk/data/conf/hive-site.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/conf/hive-site.xml?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/data/conf/hive-site.xml (original)
+++ hadoop/hive/trunk/data/conf/hive-site.xml Thu Jul 29 02:41:14 2010
@@ -60,7 +60,7 @@
<property>
<!-- this should eventually be deprecated since the metastore should supply this -->
<name>hive.metastore.warehouse.dir</name>
- <value>file://${build.dir}/test/data/warehouse/</value>
+ <value>${test.warehouse.dir}</value>
<description></description>
</property>
@@ -145,4 +145,19 @@
<description>Track progress of a task</description>
</property>
+<property>
+ <name>fs.pfile.impl</name>
+ <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
+ <description>A proxy for local file system used for cross file system testing</description>
+</property>
+
+<property>
+ <name>hive.exec.mode.local.auto</name>
+ <value>false</value>
+ <description>
+ Let hive determine whether to run in local mode automatically
+ Disabling this for tests so that minimr is not affected
+ </description>
+</property>
+
</configuration>
Modified: hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java (original)
+++ hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java Thu Jul 29 02:41:14 2010
@@ -303,8 +303,8 @@ public class TestHiveMetaStore extends T
}
assertTrue("Bad partition spec should have thrown an exception", exceptionThrown);
- FileSystem fs = FileSystem.get(hiveConf);
Path partPath = new Path(part2.getSd().getLocation());
+ FileSystem fs = FileSystem.get(partPath.toUri(), hiveConf);
assertTrue(fs.exists(partPath));
ret = client.dropPartition(dbName, tblName, part.getValues(), true);
@@ -683,7 +683,8 @@ public class TestHiveMetaStore extends T
(tbl2.getPartitionKeys() == null)
|| (tbl2.getPartitionKeys().size() == 0));
- FileSystem fs = FileSystem.get(hiveConf);
+ FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(),
+ hiveConf);
client.dropTable(dbName, tblName);
assertFalse(fs.exists(new Path(tbl.getSd().getLocation())));
@@ -775,7 +776,8 @@ public class TestHiveMetaStore extends T
assertEquals("Alter table didn't succeed. Num buckets is different ",
tbl2.getSd().getNumBuckets(), tbl3.getSd().getNumBuckets());
// check that data has moved
- FileSystem fs = FileSystem.get(hiveConf);
+ FileSystem fs = FileSystem.get((new Path(tbl.getSd().getLocation())).toUri(),
+ hiveConf);
assertFalse("old table location still exists", fs.exists(new Path(tbl
.getSd().getLocation())));
assertTrue("data did not move to new location", fs.exists(new Path(tbl3
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Thu Jul 29 02:41:14 2010
@@ -23,8 +23,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -34,9 +37,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
/**
* Context for Semantic Analyzers. Usage: not reusable - construct a new one for
@@ -50,38 +55,27 @@ public class Context {
private Path[] resDirPaths;
private int resDirFilesNum;
boolean initialized;
+ String originalTracker = null;
+ private HashMap<String, ContentSummary> pathToCS;
- // all query specific directories are created as sub-directories of queryPath
- // this applies to all non-local (ie. hdfs) file system tmp folders
- private Path queryScratchPath;
+ // scratch path to use for all non-local (ie. hdfs) file system tmp folders
+ private final Path nonLocalScratchPath;
-
- // Path without a file system
- // Used for creating temporary directory on local file system
- private String localScratchPath;
-
-
- // Fully Qualified path on the local file system
- // System.getProperty("java.io.tmpdir") + Path.SEPARATOR
- // + System.getProperty("user.name") + Path.SEPARATOR + executionId
- private Path localScratchDir;
-
- // On the default FileSystem (usually HDFS):
- // also based on hive.exec.scratchdir which by default is
- // "/tmp/"+System.getProperty("user.name")+"/hive"
- private Path MRScratchDir;
+ // scratch directory to use for local file system tmp folders
+ private final String localScratchDir;
// Keeps track of scratch directories created for different scheme/authority
- private final Map<String, Path> externalScratchDirs = new HashMap<String, Path>();
+ private final Map<String, String> fsScratchDirs = new HashMap<String, String>();
- private HiveConf conf;
+
+ private Configuration conf;
protected int pathid = 10000;
protected boolean explain = false;
private TokenRewriteStream tokenRewriteStream;
String executionId;
- public Context(HiveConf conf) throws IOException {
+ public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
@@ -89,216 +83,195 @@ public class Context {
* Create a Context with a given executionId. ExecutionId, together with
* user name and conf, will determine the temporary directory locations.
*/
- public Context(HiveConf conf, String executionId) throws IOException {
+ public Context(Configuration conf, String executionId) {
this.conf = conf;
this.executionId = executionId;
+
+ // non-local tmp location is configurable. however it is the same across
+ // all external file systems
+ nonLocalScratchPath =
+ new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
+ executionId);
- localScratchPath = System.getProperty("java.io.tmpdir")
+ // local tmp location is not configurable for now
+ localScratchDir = System.getProperty("java.io.tmpdir")
+ Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+ executionId;
-
- queryScratchPath = new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR), executionId);
}
/**
* Set the context on whether the current query is an explain query.
- *
- * @param value
- * true if the query is an explain query, false if not
+ * @param value true if the query is an explain query, false if not
*/
public void setExplain(boolean value) {
explain = value;
}
-
+
/**
- * Find out whether the current query is an explain query.
- *
+ * Find whether the current query is an explain query
* @return true if the query is an explain query, false if not
*/
- public boolean getExplain() {
+ public boolean getExplain () {
return explain;
}
- /**
- * Make a tmp directory for MR intermediate data If URI/Scheme are not
- * supplied - those implied by the default filesystem will be used (which will
- * typically correspond to hdfs instance on hadoop cluster).
- *
- * @param mkdir if true, will make the directory. Will throw IOException if that fails.
- */
- private Path makeMRScratchDir(HiveConf conf, boolean mkdir)
- throws IOException {
-
- Path dir = FileUtils.makeQualified(queryScratchPath, conf);
-
- if (mkdir) {
- FileSystem fs = dir.getFileSystem(conf);
- if (!fs.mkdirs(dir)) {
- throw new IOException("Cannot make directory: " + dir);
- }
- }
- return dir;
- }
/**
- * Make a tmp directory on specified URI Currently will use the same path as
- * implied by SCRATCHDIR config variable.
- */
- private Path makeExternalScratchDir(HiveConf conf, boolean mkdir, URI extURI)
- throws IOException {
-
- Path dir = new Path(extURI.getScheme(), extURI.getAuthority(),
- queryScratchPath.toUri().getPath());
-
- if (mkdir) {
- FileSystem fs = dir.getFileSystem(conf);
- if (!fs.mkdirs(dir)) {
- throw new IOException("Cannot make directory: " + dir);
- }
- }
- return dir;
- }
-
- /**
- * Make a tmp directory for local file system.
+ * Get a tmp directory on specified URI
*
- * @param mkdir if true, will make the directory. Will throw IOException if that fails.
- */
- private Path makeLocalScratchDir(boolean mkdir)
- throws IOException {
-
- FileSystem fs = FileSystem.getLocal(conf);
- Path dir = fs.makeQualified(new Path(localScratchPath));
-
- if (mkdir) {
- if (!fs.mkdirs(dir)) {
- throw new IOException("Cannot make directory: " + dir);
+ * @param scheme Scheme of the target FS
+ * @param authority Authority of the target FS
+ * @param mkdir create the directory if true
+ * @param scratchdir path of tmp directory
+ */
+ private String getScratchDir(String scheme, String authority,
+ boolean mkdir, String scratchDir) {
+
+ String fileSystem = scheme + ":" + authority;
+ String dir = fsScratchDirs.get(fileSystem);
+
+ if (dir == null) {
+ Path dirPath = new Path(scheme, authority, scratchDir);
+ if (mkdir) {
+ try {
+ FileSystem fs = dirPath.getFileSystem(conf);
+ if (!fs.mkdirs(dirPath))
+ throw new RuntimeException("Cannot make directory: "
+ + dirPath.toString());
+ } catch (IOException e) {
+ throw new RuntimeException (e);
+ }
}
+ dir = dirPath.toString();
+ fsScratchDirs.put(fileSystem, dir);
}
return dir;
}
+
/**
- * Get a tmp directory on specified URI Will check if this has already been
- * made (either via MR or Local FileSystem or some other external URI.
+ * Create a local scratch directory on demand and return it.
*/
- private String getExternalScratchDir(URI extURI) {
+ public String getLocalScratchDir(boolean mkdir) {
try {
- String fileSystem = extURI.getScheme() + ":" + extURI.getAuthority();
- Path dir = externalScratchDirs.get(fileSystem);
- if (dir == null) {
- dir = makeExternalScratchDir(conf, !explain, extURI);
- externalScratchDirs.put(fileSystem, dir);
- }
- return dir.toString();
+ FileSystem fs = FileSystem.getLocal(conf);
+ URI uri = fs.getUri();
+ return getScratchDir(uri.getScheme(), uri.getAuthority(),
+ mkdir, localScratchDir);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException (e);
}
}
+
/**
* Create a map-reduce scratch directory on demand and return it.
+ *
*/
public String getMRScratchDir() {
- try {
- // if we are executing entirely on the client side - then
- // just (re)use the local scratch directory
- if(isLocalOnlyExecutionMode())
- return getLocalScratchDir();
- if (MRScratchDir == null) {
- MRScratchDir = makeMRScratchDir(conf, !explain);
- }
- return MRScratchDir.toString();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (IllegalArgumentException e) {
- throw new RuntimeException("Error while making MR scratch "
- + "directory - check filesystem config (" + e.getCause() + ")", e);
- }
- }
+ // if we are executing entirely on the client side - then
+ // just (re)use the local scratch directory
+ if(isLocalOnlyExecutionMode())
+ return getLocalScratchDir(!explain);
- /**
- * Create a local scratch directory on demand and return it.
- */
- public String getLocalScratchDir() {
try {
- if (localScratchDir == null) {
- localScratchDir = makeLocalScratchDir(true);
- }
- return localScratchDir.toString();
+ Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf);
+ URI uri = dir.toUri();
+ return getScratchDir(uri.getScheme(), uri.getAuthority(),
+ !explain, uri.getPath());
+
} catch (IOException e) {
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
- throw new RuntimeException("Error while making local scratch "
+ throw new RuntimeException("Error while making MR scratch "
+ "directory - check filesystem config (" + e.getCause() + ")", e);
}
}
- private void removeDir(Path p) {
- try {
- p.getFileSystem(conf).delete(p, true);
- } catch (Exception e) {
- LOG.warn("Error Removing Scratch: "
- + StringUtils.stringifyException(e));
- }
+ private String getExternalScratchDir(URI extURI) {
+ return getScratchDir(extURI.getScheme(), extURI.getAuthority(),
+ !explain, nonLocalScratchPath.toUri().getPath());
}
/**
* Remove any created scratch directories.
*/
private void removeScratchDir() {
-
- for (Map.Entry<String, Path> p : externalScratchDirs.entrySet()) {
- removeDir(p.getValue());
- }
- externalScratchDirs.clear();
-
- if (MRScratchDir != null) {
- removeDir(MRScratchDir);
- MRScratchDir = null;
- }
-
- if (localScratchDir != null) {
- removeDir(localScratchDir);
- localScratchDir = null;
+ for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) {
+ try {
+ Path p = new Path(entry.getValue());
+ p.getFileSystem(conf).delete(p, true);
+ } catch (Exception e) {
+ LOG.warn("Error Removing Scratch: "
+ + StringUtils.stringifyException(e));
+ }
}
+ fsScratchDirs.clear();
}
- /**
- * Return the next available path in the current scratch dir.
- */
- private String nextPath(String base) {
- return base + Path.SEPARATOR + Integer.toString(pathid++);
+ private String nextPathId() {
+ return Integer.toString(pathid++);
}
+
+ private static final String MR_PREFIX = "-mr-";
+ private static final String EXT_PREFIX = "-ext-";
+ private static final String LOCAL_PREFIX = "-local-";
+
/**
- * Check if path is tmp path. the assumption is that all uri's relative to
- * scratchdir are temporary.
- *
+ * Check if path is for intermediate data
* @return true if a uri is a temporary uri for map-reduce intermediate data,
* false otherwise
*/
public boolean isMRTmpFileURI(String uriStr) {
- return (uriStr.indexOf(executionId) != -1);
+ return (uriStr.indexOf(executionId) != -1) &&
+ (uriStr.indexOf(MR_PREFIX) != -1);
}
/**
* Get a path to store map-reduce intermediate data in.
- *
+ *
* @return next available path for map-red intermediate data
*/
public String getMRTmpFileURI() {
- return nextPath(getMRScratchDir());
+ return getMRScratchDir() + Path.SEPARATOR + MR_PREFIX +
+ nextPathId();
}
+
+ /**
+ * Given a URI for mapreduce intermediate output, swizzle the
+ * it to point to the local file system. This can be called in
+ * case the caller decides to run in local mode (in which case
+ * all intermediate data can be stored locally)
+ *
+ * @param originalURI uri to localize
+ * @return localized path for map-red intermediate data
+ */
+ public String localizeMRTmpFileURI(String originalURI) {
+ Path o = new Path(originalURI);
+ Path mrbase = new Path(getMRScratchDir());
+
+ URI relURI = mrbase.toUri().relativize(o.toUri());
+ if (relURI.equals(o.toUri()))
+ throw new RuntimeException
+ ("Invalid URI: " + originalURI + ", cannot relativize against" +
+ mrbase.toString());
+
+ return getLocalScratchDir(!explain) + Path.SEPARATOR +
+ relURI.getPath();
+ }
+
+
/**
* Get a tmp path on local host to store intermediate data.
*
* @return next available tmp path on local fs
*/
public String getLocalTmpFileURI() {
- return nextPath(getLocalScratchDir());
+ return getLocalScratchDir(true) + Path.SEPARATOR + LOCAL_PREFIX +
+ nextPathId();
}
/**
@@ -309,7 +282,8 @@ public class Context {
* @return next available tmp path on the file system corresponding extURI
*/
public String getExternalTmpFileURI(URI extURI) {
- return nextPath(getExternalScratchDir(extURI));
+ return getExternalScratchDir(extURI) + Path.SEPARATOR + EXT_PREFIX +
+ nextPathId();
}
/**
@@ -368,6 +342,7 @@ public class Context {
}
}
removeScratchDir();
+ originalTracker = null;
}
public DataInput getStream() {
@@ -473,10 +448,6 @@ public class Context {
return executionId;
}
- public Path getQueryPath() {
- return queryScratchPath;
- }
-
/**
* Does Hive wants to run tasks entirely on the local machine
* (where the query is being compiled)?
@@ -484,6 +455,66 @@ public class Context {
* Today this translates into running hadoop jobs locally
*/
public boolean isLocalOnlyExecutionMode() {
- return conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local");
+ return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local");
+ }
+
+ public void setOriginalTracker(String originalTracker) {
+ this.originalTracker = originalTracker;
+ }
+
+ public void restoreOriginalTracker() {
+ if (originalTracker != null) {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HADOOPJT, originalTracker);
+ originalTracker = null;
+ }
+ }
+
+ public void addCS(String path, ContentSummary cs) {
+ if(pathToCS == null)
+ pathToCS = new HashMap<String, ContentSummary> ();
+ pathToCS.put(path, cs);
+ }
+
+ public ContentSummary getCS(String path) {
+ if(pathToCS == null)
+ pathToCS = new HashMap<String, ContentSummary> ();
+ return pathToCS.get(path);
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+
+ /**
+ * Given a mapping from paths to objects, localize any MR tmp paths
+ * @param map mapping from paths to objects
+ */
+ public void localizeKeys(Map<String, Object> map) {
+ for (Map.Entry<String, Object> entry: map.entrySet()) {
+ String path = entry.getKey();
+ if (isMRTmpFileURI(path)) {
+ Object val = entry.getValue();
+ map.remove(path);
+ map.put(localizeMRTmpFileURI(path), val);
+ }
+ }
+ }
+
+ /**
+ * Given a list of paths, localize any MR tmp paths contained therein
+ * @param paths list of paths to be localized
+ */
+ public void localizePaths(List<String> paths) {
+ Iterator<String> iter = paths.iterator();
+ List<String> toAdd = new ArrayList<String> ();
+ while(iter.hasNext()) {
+ String path = iter.next();
+ if (isMRTmpFileURI(path)) {
+ iter.remove();
+ toAdd.add(localizeMRTmpFileURI(path));
+ }
+ }
+ paths.addAll(toAdd);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Jul 29 02:41:14 2010
@@ -97,34 +97,6 @@ public class Driver implements CommandPr
Operator.resetId();
}
- public int countJobs(List<Task<? extends Serializable>> tasks) {
- return countJobs(tasks, new ArrayList<Task<? extends Serializable>>());
- }
-
- public int countJobs(List<Task<? extends Serializable>> tasks,
- List<Task<? extends Serializable>> seenTasks) {
- if (tasks == null) {
- return 0;
- }
- int jobs = 0;
- for (Task<? extends Serializable> task : tasks) {
- if (!seenTasks.contains(task)) {
- seenTasks.add(task);
-
- if (task instanceof ConditionalTask) {
- jobs += countJobs(((ConditionalTask) task).getListTasks(), seenTasks);
- } else if (task.isMapRedTask()) { // this may be true for conditional
- // task, but we will not inc the
- // counter
- jobs++;
- }
-
- jobs += countJobs(task.getChildTasks(), seenTasks);
- }
- }
- return jobs;
- }
-
/**
* Return the status information about the Map-Reduce cluster
*/
@@ -319,7 +291,7 @@ public class Driver implements CommandPr
// test Only - serialize the query plan and deserialize it
if("true".equalsIgnoreCase(System.getProperty("test.serialize.qplan"))) {
- String queryPlanFileName = ctx.getLocalScratchDir() + Path.SEPARATOR_CHAR
+ String queryPlanFileName = ctx.getLocalScratchDir(true) + Path.SEPARATOR_CHAR
+ "queryplan.xml";
LOG.info("query plan = " + queryPlanFileName);
queryPlanFileName = new Path(queryPlanFileName).toUri().getPath();
@@ -468,7 +440,7 @@ public class Driver implements CommandPr
UnixUserGroupInformation.UGI_PROPERTY_NAME));
}
- int jobs = countJobs(plan.getRootTasks());
+ int jobs = Utilities.getMRTasks(plan.getRootTasks()).size();
if (jobs > 0) {
console.printInfo("Total MapReduce jobs = " + jobs);
}
@@ -539,6 +511,10 @@ public class Driver implements CommandPr
}
}
+ // in case we decided to run everything in local mode, restore the
+ // the jobtracker setting to its initial value
+ ctx.restoreOriginalTracker();
+
// Get all the post execution hooks and execute them.
for (PostExecute peh : getPostExecHooks()) {
peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java Thu Jul 29 02:41:14 2010
@@ -77,4 +77,8 @@ public class CollectOperator extends Ope
}
}
+ @Override
+ public int getType() {
+ return -1;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java Thu Jul 29 02:41:14 2010
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
@@ -199,4 +200,11 @@ public class ConditionalTask extends Tas
}
return ret;
}
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ if (getListTasks() != null)
+ for(Task<? extends Serializable> t: getListTasks())
+ t.localizeMRTmpFiles(ctx);
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java Thu Jul 29 02:41:14 2010
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -97,4 +98,12 @@ public class CopyTask extends Task<CopyW
public String getName() {
return "COPY";
}
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ // copy task is only used by the load command and
+ // does not use any map-reduce tmp files
+ // we don't expect to enter this code path at all
+ throw new RuntimeException ("Unexpected call");
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Thu Jul 29 02:41:14 2010
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -424,8 +425,9 @@ public class DDLTask extends Task<DDLWor
p.setLocation(parentDir);
}
- private boolean pathExists(FileSystem fs, Path p) throws HiveException {
+ private boolean pathExists(Path p) throws HiveException {
try {
+ FileSystem fs = p.getFileSystem(conf);
return fs.exists(p);
} catch (IOException e) {
throw new HiveException(e);
@@ -477,16 +479,13 @@ public class DDLTask extends Task<DDLWor
Path originalDir = new Path(getOriginalLocation(p));
Path leftOverIntermediateOriginal = new Path(originalDir.getParent(),
originalDir.getName() + INTERMEDIATE_ORIGINAL_DIR_SUFFIX);
- try {
- if (pathExists(leftOverIntermediateOriginal.getFileSystem(conf),
- leftOverIntermediateOriginal)) {
- console.printInfo("Deleting " + leftOverIntermediateOriginal +
- " left over from a previous archiving operation");
- deleteDir(leftOverIntermediateOriginal);
- }
- } catch (IOException e) {
- throw new HiveException(e);
+
+ if (pathExists(leftOverIntermediateOriginal)) {
+ console.printInfo("Deleting " + leftOverIntermediateOriginal +
+ " left over from a previous archiving operation");
+ deleteDir(leftOverIntermediateOriginal);
}
+
throw new HiveException("Specified partition is already archived");
}
@@ -525,12 +524,12 @@ public class DDLTask extends Task<DDLWor
// ARCHIVE_INTERMEDIATE_DIR_SUFFIX that's the same level as the partition,
// if it does not already exist. If it does exist, we assume the dir is good
// to use as the move operation that created it is atomic.
- if (!pathExists(fs, intermediateArchivedDir) &&
- !pathExists(fs, intermediateOriginalDir)) {
+ if (!pathExists(intermediateArchivedDir) &&
+ !pathExists(intermediateOriginalDir)) {
// First create the archive in a tmp dir so that if the job fails, the
// bad files don't pollute the filesystem
- Path tmpDir = new Path(driverContext.getCtx().getMRScratchDir(), "partlevel");
+ Path tmpDir = new Path(driverContext.getCtx().getExternalTmpFileURI(originalDir.toUri()), "partlevel");
console.printInfo("Creating " + archiveName + " for " + originalDir.toString());
console.printInfo("in " + tmpDir);
@@ -551,7 +550,7 @@ public class DDLTask extends Task<DDLWor
// the partition directory. e.g. .../hr=12-intermediate-archived
try {
console.printInfo("Moving " + tmpDir + " to " + intermediateArchivedDir);
- if (pathExists(fs, intermediateArchivedDir)) {
+ if (pathExists(intermediateArchivedDir)) {
throw new HiveException("The intermediate archive directory already exists.");
}
fs.rename(tmpDir, intermediateArchivedDir);
@@ -559,7 +558,7 @@ public class DDLTask extends Task<DDLWor
throw new HiveException("Error while moving tmp directory");
}
} else {
- if (pathExists(fs, intermediateArchivedDir)) {
+ if (pathExists(intermediateArchivedDir)) {
console.printInfo("Intermediate archive directory " + intermediateArchivedDir +
" already exists. Assuming it contains an archived version of the partition");
}
@@ -571,7 +570,7 @@ public class DDLTask extends Task<DDLWor
// Move the original parent directory to the intermediate original directory
// if the move hasn't been made already
- if (!pathExists(fs, intermediateOriginalDir)) {
+ if (!pathExists(intermediateOriginalDir)) {
console.printInfo("Moving " + originalDir + " to " +
intermediateOriginalDir);
moveDir(fs, originalDir, intermediateOriginalDir);
@@ -587,7 +586,7 @@ public class DDLTask extends Task<DDLWor
// recovery
// Move the intermediate archived directory to the original parent directory
- if (!pathExists(fs, originalDir)) {
+ if (!pathExists(originalDir)) {
console.printInfo("Moving " + intermediateArchivedDir + " to " +
originalDir);
moveDir(fs, intermediateArchivedDir, originalDir);
@@ -663,15 +662,12 @@ public class DDLTask extends Task<DDLWor
Path leftOverArchiveDir = new Path(location.getParent(),
location.getName() + INTERMEDIATE_ARCHIVED_DIR_SUFFIX);
- try {
- if (pathExists(location.getFileSystem(conf), leftOverArchiveDir)) {
- console.printInfo("Deleting " + leftOverArchiveDir + " left over " +
- "from a previous unarchiving operation");
- deleteDir(leftOverArchiveDir);
- }
- } catch (IOException e) {
- throw new HiveException(e);
+ if (pathExists(leftOverArchiveDir)) {
+ console.printInfo("Deleting " + leftOverArchiveDir + " left over " +
+ "from a previous unarchiving operation");
+ deleteDir(leftOverArchiveDir);
}
+
throw new HiveException("Specified partition is not archived");
}
@@ -682,7 +678,9 @@ public class DDLTask extends Task<DDLWor
Path intermediateExtractedDir = new Path(originalLocation.getParent(),
originalLocation.getName() + INTERMEDIATE_EXTRACTED_DIR_SUFFIX);
- Path tmpDir = new Path(driverContext.getCtx().getMRScratchDir());
+ Path tmpDir = new Path(driverContext
+ .getCtx()
+ .getExternalTmpFileURI(originalLocation.toUri()));
FileSystem fs = null;
try {
@@ -727,8 +725,8 @@ public class DDLTask extends Task<DDLWor
// 5. Change the metadata
// 6. Delete the archived partition files in intermediate-archive
- if (!pathExists(fs, intermediateExtractedDir) &&
- !pathExists(fs, intermediateArchiveDir)) {
+ if (!pathExists(intermediateExtractedDir) &&
+ !pathExists(intermediateArchiveDir)) {
try {
// Copy the files out of the archive into the temporary directory
@@ -765,7 +763,7 @@ public class DDLTask extends Task<DDLWor
// At this point, we know that the extracted files are in the intermediate
// extracted dir, or in the the original directory.
- if (!pathExists(fs, intermediateArchiveDir)) {
+ if (!pathExists(intermediateArchiveDir)) {
try {
console.printInfo("Moving " + originalLocation + " to " + intermediateArchiveDir);
fs.rename(originalLocation, intermediateArchiveDir);
@@ -783,7 +781,7 @@ public class DDLTask extends Task<DDLWor
// If the original location exists here, then it must be the extracted files
// because in the previous step, we moved the previous original location
// (containing the archived version of the files) to intermediateArchiveDir
- if (!pathExists(fs, originalLocation)) {
+ if (!pathExists(originalLocation)) {
try {
console.printInfo("Moving " + intermediateExtractedDir + " to " + originalLocation);
fs.rename(intermediateExtractedDir, originalLocation);
@@ -2124,4 +2122,8 @@ public class DDLTask extends Task<DDLWor
return "DDL";
}
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ // no-op
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jul 29 02:41:14 2010
@@ -45,13 +45,13 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
@@ -60,7 +60,10 @@ import org.apache.hadoop.hive.ql.history
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -96,8 +99,6 @@ public class ExecDriver extends Task<Map
protected transient int reduceProgress = 0;
protected transient boolean success = false; // if job execution is successful
- public static Random randGen = new Random();
-
/**
* Constructor when invoked from QL.
*/
@@ -105,7 +106,7 @@ public class ExecDriver extends Task<Map
super();
}
- public static String getResourceFiles(Configuration conf,
+ protected static String getResourceFiles(Configuration conf,
SessionState.ResourceType t) {
// fill in local files to be added to the task environment
SessionState ss = SessionState.get();
@@ -178,7 +179,7 @@ public class ExecDriver extends Task<Map
* used to kill all running jobs in the event of an unexpected shutdown -
* i.e., the JVM shuts down while there are still jobs running.
*/
- public static Map<String, String> runningJobKillURIs =
+ private static Map<String, String> runningJobKillURIs =
Collections.synchronizedMap(new HashMap<String, String>());
/**
@@ -222,7 +223,7 @@ public class ExecDriver extends Task<Map
/**
* from StreamJob.java.
*/
- public void jobInfo(RunningJob rj) {
+ private void jobInfo(RunningJob rj) {
if (job.get("mapred.job.tracker", "local").equals("local")) {
console.printInfo("Job running in-process (local Hadoop)");
} else {
@@ -245,7 +246,7 @@ public class ExecDriver extends Task<Map
* return this handle from execute and Driver can split execute into start,
* monitorProgess and postProcess.
*/
- public static class ExecDriverTaskHandle extends TaskHandle {
+ private static class ExecDriverTaskHandle extends TaskHandle {
JobClient jc;
RunningJob rj;
@@ -284,8 +285,7 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false
* otherwise.
*/
- protected boolean checkFatalErrors(TaskHandle t, StringBuilder errMsg) {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
+ private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) {
RunningJob rj = th.getRunningJob();
try {
Counters ctrs = th.getCounters();
@@ -311,9 +311,7 @@ public class ExecDriver extends Task<Map
}
}
- @Override
- public void progress(TaskHandle taskHandle) throws IOException {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle) taskHandle;
+ private void progress(ExecDriverTaskHandle th) throws IOException {
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
@@ -404,101 +402,9 @@ public class ExecDriver extends Task<Map
}
/**
- * Estimate the number of reducers needed for this job, based on job input,
- * and configuration parameters.
- *
- * @return the number of reducers.
- */
- public int estimateNumberOfReducers(HiveConf hive, JobConf job,
- MapredWork work) throws IOException {
- if (hive == null) {
- hive = new HiveConf();
- }
- long bytesPerReducer = hive.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
- int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
- long totalInputFileSize = getTotalInputFileSize(job, work);
-
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-
- int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
- return reducers;
- }
-
- /**
- * Set the number of reducers for the mapred work.
- */
- protected void setNumberOfReducers() throws IOException {
- // this is a temporary hack to fix things that are not fixed in the compiler
- Integer numReducersFromWork = work.getNumReduceTasks();
-
- if (work.getReducer() == null) {
- console
- .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
- work.setNumReduceTasks(Integer.valueOf(0));
- } else {
- if (numReducersFromWork >= 0) {
- console.printInfo("Number of reduce tasks determined at compile time: "
- + work.getNumReduceTasks());
- } else if (job.getNumReduceTasks() > 0) {
- int reducers = job.getNumReduceTasks();
- work.setNumReduceTasks(reducers);
- console
- .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
- + reducers);
- } else {
- int reducers = estimateNumberOfReducers(conf, job, work);
- work.setNumReduceTasks(reducers);
- console
- .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
- + reducers);
-
- }
- console
- .printInfo("In order to change the average load for a reducer (in bytes):");
- console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
- + "=<number>");
- console.printInfo("In order to limit the maximum number of reducers:");
- console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname
- + "=<number>");
- console.printInfo("In order to set a constant number of reducers:");
- console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
- + "=<number>");
- }
- }
-
- /**
- * Calculate the total size of input files.
- *
- * @param job
- * the hadoop job conf.
- * @return the total size in bytes.
- * @throws IOException
- */
- public long getTotalInputFileSize(JobConf job, MapredWork work) throws IOException {
- long r = 0;
- // For each input path, calculate the total size.
- for (String path : work.getPathToAliases().keySet()) {
- try {
- Path p = new Path(path);
- FileSystem fs = p.getFileSystem(job);
- ContentSummary cs = fs.getContentSummary(p);
- r += cs.getLength();
- } catch (IOException e) {
- LOG.info("Cannot get size of " + path + ". Safely ignored.");
- }
- }
- return r;
- }
-
- /**
* Update counters relevant to this task.
*/
- @Override
- public void updateCounters(TaskHandle t) throws IOException {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle) t;
+ private void updateCounters(ExecDriverTaskHandle th) throws IOException {
RunningJob rj = th.getRunningJob();
mapProgress = Math.round(rj.mapProgress() * 100);
reduceProgress = Math.round(rj.reduceProgress() * 100);
@@ -543,49 +449,31 @@ public class ExecDriver extends Task<Map
success = true;
- try {
- setNumberOfReducers();
- } catch (IOException e) {
- String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
- + e.getMessage();
- console.printError(statusMesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
- return 1;
- }
-
String invalidReason = work.isInvalid();
if (invalidReason != null) {
throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
}
- String hiveScratchDir;
- if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) {
- hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
- } else {
- hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
- }
+ Context ctx = driverContext.getCtx();
+ boolean ctxCreated = false;
+ String emptyScratchDirStr;
+ Path emptyScratchDir;
- String emptyScratchDirStr = null;
- Path emptyScratchDir = null;
+ try {
+ if (ctx == null) {
+ ctx = new Context(job);
+ ctxCreated = true;
+ }
- int numTries = 3;
- while (numTries > 0) {
- emptyScratchDirStr = hiveScratchDir + File.separator
- + Utilities.randGen.nextInt();
+ emptyScratchDirStr = ctx.getMRTmpFileURI();
emptyScratchDir = new Path(emptyScratchDirStr);
-
- try {
- FileSystem fs = emptyScratchDir.getFileSystem(job);
- fs.mkdirs(emptyScratchDir);
- break;
- } catch (Exception e) {
- if (numTries > 0) {
- numTries--;
- } else {
- throw new RuntimeException("Failed to make dir "
- + emptyScratchDir.toString() + " : " + e.getMessage());
- }
- }
+ FileSystem fs = emptyScratchDir.getFileSystem(job);
+ fs.mkdirs(emptyScratchDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ console.printError("Error launching map-reduce job", "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return 5;
}
ShimLoader.getHadoopShims().setNullOutputFormat(job);
@@ -674,13 +562,13 @@ public class ExecDriver extends Task<Map
if (noName) {
// This is for a special case to ensure unit tests pass
HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB"
- + randGen.nextInt());
+ + Utilities.randGen.nextInt());
}
try {
addInputPaths(job, work, emptyScratchDirStr);
- Utilities.setMapRedWork(job, work, hiveScratchDir);
+ Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs
@@ -699,19 +587,17 @@ public class ExecDriver extends Task<Map
HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd);
}
- // add to list of running jobs so in case of abnormal shutdown can kill
- // it.
+ // add to list of running jobs to kill in case of abnormal shutdown
runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL()
+ "&action=kill");
- TaskHandle th = new ExecDriverTaskHandle(jc, rj);
+ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
progress(th); // success status will be setup inside progress
if (rj == null) {
// in the corner case where the running job has disappeared from JT
- // memory
- // remember that we did actually submit the job.
+ // memory remember that we did actually submit the job.
rj = orig_rj;
success = false;
}
@@ -743,7 +629,9 @@ public class ExecDriver extends Task<Map
} finally {
Utilities.clearMapRedWork(job);
try {
- emptyScratchDir.getFileSystem(job).delete(emptyScratchDir, true);
+ if(ctxCreated)
+ ctx.clear();
+
if (returnVal != 0 && rj != null) {
rj.killJob();
}
@@ -796,7 +684,7 @@ public class ExecDriver extends Task<Map
* @param jobId
* @return
*/
- public static String getJobStartMsg(String jobId) {
+ private static String getJobStartMsg(String jobId) {
return "Starting Job = " + jobId;
}
@@ -1081,16 +969,10 @@ public class ExecDriver extends Task<Map
// log the list of job conf parameters for reference
LOG.info(sb.toString());
- URI pathURI = (new Path(planFileName)).toUri();
- InputStream pathData;
- if (StringUtils.isEmpty(pathURI.getScheme())) {
- // default to local file system
- pathData = new FileInputStream(planFileName);
- } else {
- // otherwise may be in hadoop ..
- FileSystem fs = FileSystem.get(conf);
- pathData = fs.open(new Path(planFileName));
- }
+ // the plan file should always be in local directory
+ Path p = new Path(planFileName);
+ FileSystem fs = FileSystem.getLocal(conf);
+ InputStream pathData = fs.open(p);
// this is workaround for hadoop-17 - libjars are not added to classpath of the
// child process. so we add it here explicitly
@@ -1177,13 +1059,13 @@ public class ExecDriver extends Task<Map
sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
+ Utilities.randGen.nextInt(), "UTF-8"));
}
-
+
return sb.toString();
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
-
+
@Override
public boolean isMapRedTask() {
return true;
@@ -1195,19 +1077,6 @@ public class ExecDriver extends Task<Map
return w.getReducer() != null;
}
- private boolean isEmptyPath(JobConf job, String path) throws Exception {
- Path dirPath = new Path(path);
- FileSystem inpFs = dirPath.getFileSystem(job);
-
- if (inpFs.exists(dirPath)) {
- FileStatus[] fStats = inpFs.listStatus(dirPath);
- if (fStats.length > 0) {
- return false;
- }
- }
- return true;
- }
-
/**
* Handle a empty/null path for a given alias.
*/
@@ -1309,7 +1178,7 @@ public class ExecDriver extends Task<Map
LOG.info("Adding input file " + path);
- if (!isEmptyPath(job, path)) {
+ if (!Utilities.isEmptyPath(job, path)) {
FileInputFormat.addInputPaths(job, path);
} else {
emptyPaths.add(path);
@@ -1345,6 +1214,53 @@ public class ExecDriver extends Task<Map
@Override
public String getName() {
- return "EXEC";
+ return "MAPRED";
+ }
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+
+ // localize any map-reduce input paths
+ ctx.localizeKeys((Map<String, Object>)((Object)work.getPathToAliases()));
+ ctx.localizeKeys((Map<String, Object>)((Object)work.getPathToPartitionInfo()));
+
+ // localize any input paths for maplocal work
+ MapredLocalWork l = work.getMapLocalWork();
+ if (l != null) {
+ Map<String, FetchWork> m = l.getAliasToFetchWork();
+ if (m != null) {
+ for (FetchWork fw: m.values()) {
+ String s = fw.getTblDir();
+ if ((s != null) && ctx.isMRTmpFileURI(s))
+ fw.setTblDir(ctx.localizeMRTmpFileURI(s));
+ }
+ }
+ }
+
+ // fix up outputs
+ Map<String, ArrayList<String>> pa = work.getPathToAliases();
+ if (pa != null) {
+ for (List<String> ls: pa.values())
+ for (String a: ls) {
+ ArrayList<Operator<? extends Serializable>> opList = new
+ ArrayList<Operator<? extends Serializable>> ();
+ opList.add(work.getAliasToWork().get(a));
+
+ while (!opList.isEmpty()) {
+ Operator<? extends Serializable> op = opList.remove(0);
+
+ if (op instanceof FileSinkOperator) {
+ FileSinkDesc fdesc = ((FileSinkOperator)op).getConf();
+ String s = fdesc.getDirName();
+ if ((s != null) && ctx.isMRTmpFileURI(s))
+ fdesc.setDirName(ctx.localizeMRTmpFileURI(s));
+ ((FileSinkOperator)op).setConf(fdesc);
+ }
+
+ if (op.getChildOperators() != null)
+ opList.addAll(op.getChildOperators());
+ }
+ }
+ }
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Thu Jul 29 02:41:14 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
/**
@@ -402,4 +403,12 @@ public class ExplainTask extends Task<Ex
public String getName() {
return "EXPLAIN";
}
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ // explain task has nothing to localize
+ // we don't expect to enter this code path at all
+ throw new RuntimeException ("Unexpected call");
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Jul 29 02:41:14 2010
@@ -282,9 +282,13 @@ public class FetchOperator implements Se
splitNum = 0;
serde = tmp.getDeserializerClass().newInstance();
serde.initialize(job, tmp.getProperties());
- LOG.debug("Creating fetchTask with deserializer typeinfo: "
- + serde.getObjectInspector().getTypeName());
- LOG.debug("deserializer properties: " + tmp.getProperties());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Creating fetchTask with deserializer typeinfo: "
+ + serde.getObjectInspector().getTypeName());
+ LOG.debug("deserializer properties: " + tmp.getProperties());
+ }
+
if (currPart != null) {
setPrtnDesc();
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Thu Jul 29 02:41:14 2010
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -147,4 +148,15 @@ public class FetchTask extends Task<Fetc
public String getName() {
return "FETCH";
}
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ String s = work.getTblDir();
+ if ((s != null) && ctx.isMRTmpFileURI(s))
+ work.setTblDir(ctx.localizeMRTmpFileURI(s));
+
+ ArrayList<String> ls = work.getPartDir();
+ if (ls != null)
+ ctx.localizePaths(ls);
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Jul 29 02:41:14 2010
@@ -732,7 +732,8 @@ public final class FunctionRegistry {
}
public static GenericUDAFResolver getGenericUDAFResolver(String functionName) {
- LOG.debug("Looking up GenericUDAF: " + functionName);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Looking up GenericUDAF: " + functionName);
FunctionInfo finfo = mFunctions.get(functionName.toLowerCase());
if (finfo == null) {
return null;
@@ -870,9 +871,10 @@ public final class FunctionRegistry {
conversionCost += cost;
}
}
- LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = "
- + argumentsPassed + " accepted = " + argumentsAccepted + " method = "
- + m);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = "
+ + argumentsPassed + " accepted = " + argumentsAccepted +
+ " method = " + m);
if (match) {
// Always choose the function with least implicit conversions.
if (conversionCost < leastConversionCost) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Thu Jul 29 02:41:14 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc;
@@ -127,4 +128,9 @@ public class FunctionTask extends Task<F
public String getName() {
return "FUNCTION";
}
-}
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ throw new RuntimeException ("Unexpected call");
+ }
+}
\ No newline at end of file
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Jul 29 02:41:14 2010
@@ -427,4 +427,10 @@ public class MapOperator extends Operato
public String getName() {
return "MAP";
}
+
+ @Override
+ public int getType() {
+ return -1;
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jul 29 02:41:14 2010
@@ -18,9 +18,9 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
-import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.io.Serializable;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -28,19 +28,28 @@ import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
/**
- * Alternate implementation (to ExecDriver) of spawning a mapreduce task that
- * runs it from a separate jvm. The primary issue with this is the inability to
- * control logging from a separate jvm in a consistent manner
+ * Extension of ExecDriver:
+ * - can optionally spawn a map-reduce task from a separate jvm
+ * - will make last minute adjustments to map-reduce job parameters, viz:
+ * * estimating number of reducers
+ * * estimating whether job should run locally
**/
-public class MapRedTask extends Task<MapredWork> implements Serializable {
+public class MapRedTask extends ExecDriver implements Serializable {
private static final long serialVersionUID = 1L;
@@ -48,21 +57,75 @@ public class MapRedTask extends Task<Map
static final String HADOOP_OPTS_KEY = "HADOOP_OPTS";
static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"};
+ private transient ContentSummary inputSummary = null;
+ private transient boolean runningViaChild = false;
+
public MapRedTask() {
super();
}
+ public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
+ throw new RuntimeException("Illegal Constructor call");
+ }
+
@Override
public int execute(DriverContext driverContext) {
+ Context ctx = driverContext.getCtx();
+ boolean ctxCreated = false;
+
try {
+ if (ctx == null) {
+ ctx = new Context(conf);
+ ctxCreated = true;
+ }
+
+ // estimate number of reducers
+ setNumberOfReducers();
+
+ // auto-determine local mode if allowed
+ if (!ctx.isLocalOnlyExecutionMode() &&
+ conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
+
+ if (inputSummary == null)
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+
+ // at this point the number of reducers is precisely defined in the plan
+ int numReducers = work.getNumReduceTasks();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task: " + getId() + ", Summary: " +
+ inputSummary.getLength() + "," + inputSummary.getFileCount() + ","
+ + numReducers);
+ }
+
+ String reason = MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers);
+ if (reason == null) {
+ // set the JT to local for the duration of this job
+ ctx.setOriginalTracker(conf.getVar(HiveConf.ConfVars.HADOOPJT));
+ conf.setVar(HiveConf.ConfVars.HADOOPJT, "local");
+ console.printInfo("Selecting local mode for task: " + getId());
+ } else {
+ console.printInfo("Cannot run job locally: " + reason);
+ }
+ }
+
+ runningViaChild =
+ "local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) ||
+ conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
+
+ if(!runningViaChild) {
+ // we are not running this mapred task via child jvm
+ // so directly invoke ExecDriver
+ return super.execute(driverContext);
+ }
+
// enable assertion
String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
String hiveJar = conf.getJar();
String libJarsOption;
- String addedJars = ExecDriver.getResourceFiles(conf,
- SessionState.ResourceType.JAR);
+ String addedJars = getResourceFiles(conf, SessionState.ResourceType.JAR);
conf.setVar(ConfVars.HIVEADDEDJARS, addedJars);
String auxJars = conf.getAuxJars();
// Put auxjars and addedjars together into libjars
@@ -80,40 +143,13 @@ public class MapRedTask extends Task<Map
}
}
// Generate the hiveConfArgs after potentially adding the jars
- String hiveConfArgs = ExecDriver.generateCmdLine(conf);
- String hiveScratchDir;
- if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null)
- hiveScratchDir = driverContext.getCtx().getQueryPath().toString();
- else
- hiveScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR);
-
- File scratchDir = new File(hiveScratchDir);
-
- // Check if the scratch directory exists. If not, create it.
- if (!scratchDir.exists()) {
- LOG.info("Local scratch directory " + scratchDir.getPath()
- + " not found. Attempting to create.");
- if (!scratchDir.mkdirs()) {
- // Unable to create this directory - it might have been created due
- // to another process.
- if (!scratchDir.exists()) {
- throw new TaskExecutionException(
- "Cannot create scratch directory "
- + "\"" + scratchDir.getPath() + "\". "
- + "To configure a different directory, "
- + "set the configuration "
- + "\"hive.exec.scratchdir\" "
- + "in the session, or permanently by modifying the "
- + "appropriate hive configuration file such as hive-site.xml.");
- }
- }
- }
+ String hiveConfArgs = generateCmdLine(conf);
+ // write out the plan to a local file
+ Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
+ OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredWork plan = getWork();
-
- File planFile = File.createTempFile("plan", ".xml", scratchDir);
- LOG.info("Generating plan file " + planFile.toString());
- FileOutputStream out = new FileOutputStream(planFile);
+ LOG.info("Generating plan file " + planPath.toString());
Utilities.serializeMapRedWork(plan, out);
String isSilent = "true".equalsIgnoreCase(System
@@ -127,10 +163,9 @@ public class MapRedTask extends Task<Map
}
String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
- + planFile.toString() + " " + isSilent + " " + hiveConfArgs;
+ + planPath.toString() + " " + isSilent + " " + hiveConfArgs;
- String files = ExecDriver.getResourceFiles(conf,
- SessionState.ResourceType.FILE);
+ String files = getResourceFiles(conf, SessionState.ResourceType.FILE);
if (!files.isEmpty()) {
cmdLine = cmdLine + " -files " + files;
}
@@ -196,27 +231,148 @@ public class MapRedTask extends Task<Map
e.printStackTrace();
LOG.error("Exception: " + e.getMessage());
return (1);
+ } finally {
+ try {
+ // in case we decided to run everything in local mode, restore the
+ // the jobtracker setting to its initial value
+ ctx.restoreOriginalTracker();
+
+ // creating the context can create a bunch of files. So make
+ // sure to clear it out
+ if(ctxCreated)
+ ctx.clear();
+
+ } catch (Exception e) {
+ LOG.error("Exception: " + e.getMessage());
+ }
}
}
@Override
- public boolean isMapRedTask() {
- return true;
+ public boolean mapStarted() {
+ boolean b = super.mapStarted();
+ return runningViaChild ? isdone : b;
}
@Override
- public boolean hasReduce() {
- MapredWork w = getWork();
- return w.getReducer() != null;
+ public boolean reduceStarted() {
+ boolean b = super.reduceStarted();
+ return runningViaChild ? isdone : b;
}
@Override
- public int getType() {
- return StageType.MAPREDLOCAL;
+ public boolean mapDone() {
+ boolean b = super.mapDone();
+ return runningViaChild ? isdone : b;
}
@Override
- public String getName() {
- return "MAPRED";
+ public boolean reduceDone() {
+ boolean b = super.reduceDone();
+ return runningViaChild ? isdone : b;
+ }
+
+ /**
+ * Set the number of reducers for the mapred work.
+ */
+ private void setNumberOfReducers() throws IOException {
+ // this is a temporary hack to fix things that are not fixed in the compiler
+ Integer numReducersFromWork = work.getNumReduceTasks();
+
+ if (work.getReducer() == null) {
+ console
+ .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+ work.setNumReduceTasks(Integer.valueOf(0));
+ } else {
+ if (numReducersFromWork >= 0) {
+ console.printInfo("Number of reduce tasks determined at compile time: "
+ + work.getNumReduceTasks());
+ } else if (job.getNumReduceTasks() > 0) {
+ int reducers = job.getNumReduceTasks();
+ work.setNumReduceTasks(reducers);
+ console
+ .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ + reducers);
+ } else {
+ int reducers = estimateNumberOfReducers();
+ work.setNumReduceTasks(reducers);
+ console
+ .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+ + reducers);
+
+ }
+ console
+ .printInfo("In order to change the average load for a reducer (in bytes):");
+ console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname
+ + "=<number>");
+ console.printInfo("In order to limit the maximum number of reducers:");
+ console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname
+ + "=<number>");
+ console.printInfo("In order to set a constant number of reducers:");
+ console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS
+ + "=<number>");
+ }
+ }
+
+ /**
+ * Estimate the number of reducers needed for this job, based on job input,
+ * and configuration parameters.
+ *
+ * @return the number of reducers.
+ */
+ private int estimateNumberOfReducers() throws IOException {
+ long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+ if(inputSummary == null)
+ // compute the summary and stash it away
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+
+ long totalInputFileSize = inputSummary.getLength();
+
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+
+ int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+ return reducers;
+ }
+
+ /**
+ * Find out if a job can be run in local mode based on it's characteristics
+ *
+ * @param conf Hive Configuration
+ * @param inputSummary summary about the input files for this job
+ * @param numReducers total number of reducers for this job
+ * @return String null if job is eligible for local mode, reason otherwise
+ */
+ public static String isEligibleForLocalMode(HiveConf conf,
+ ContentSummary inputSummary,
+ int numReducers) {
+
+ long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES);
+ long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS);
+
+ // check for max input size
+ if (inputSummary.getLength() > maxBytes)
+ return "Input Size (= " + maxBytes + ") is larger than " +
+ HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")";
+
+ // ideally we would like to do this check based on the number of splits
+ // in the absence of an easy way to get the number of splits - do this
+ // based on the total number of files (pessimistically assumming that
+ // splits are equal to number of files in worst case)
+ if (inputSummary.getFileCount() > maxTasks)
+ return "Number of Input Files (= " + inputSummary.getFileCount() +
+ ") is larger than " +
+ HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")";
+
+ // since local mode only runs with 1 reducers - make sure that the
+ // the number of reducers (set by user or inferred) is <=1
+ if (numReducers > 1)
+ return "Number of reducers (= " + numReducers + ") is more than 1";
+
+ return null;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jul 29 02:41:14 2010
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.Lo
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.util.StringUtils;
/**
@@ -277,4 +278,10 @@ public class MoveTask extends Task<MoveW
public String getName() {
return "MOVE";
}
+
+
+ @Override
+ protected void localizeMRTmpFilesImpl(Context ctx) {
+ // no-op
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=980297&r1=980296&r2=980297&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Jul 29 02:41:14 2010
@@ -1129,15 +1129,12 @@ public abstract class Operator<T extends
}
/**
- * Should be overridden to return the type of the specific operator among the
+ * Return the type of the specific operator among the
* types in OperatorType.
*
- * @return OperatorType.* or -1 if not overridden
+ * @return OperatorType.*
*/
- public int getType() {
- assert false;
- return -1;
- }
+ abstract public int getType();
public void setGroupKeyObject(Object keyObject) {
this.groupKeyObject = keyObject;