You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/02/19 01:58:31 UTC
svn commit: r911664 [1/15] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org...
Author: heyongqiang
Date: Fri Feb 19 00:58:28 2010
New Revision: 911664
URL: http://svn.apache.org/viewvc?rev=911664&view=rev
Log:
HIVE-1178. enforce bucketing for a table.(Namit Jain via He Yongqiang)
Added:
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q
hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket3.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input23.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input42.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part9.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join17.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join9.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner3.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/sample9.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
hadoop/hive/trunk/ql/src/test/results/clientpositive/union_ppr.q.out
hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Feb 19 00:58:28 2010
@@ -27,6 +27,9 @@
HIVE-917. Bucketed Map Join
(He Yongqiang via namit)
+ HIVE-1178. enforce bucketing for a table.
+ (Namit Jain via He Yongqiang)
+
IMPROVEMENTS
HIVE-983. Function from_unixtime takes long.
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Feb 19 00:58:28 2010
@@ -190,6 +190,9 @@
HIVEINPUTFORMAT("hive.input.format", ""),
+ HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
+ HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Fri Feb 19 00:58:28 2010
@@ -308,16 +308,16 @@
<property>
<name>hive.skewjoin.mapjoin.map.tasks</name>
<value>10000</value>
- <description> Determine the number of map task used in the follow up map join job
- for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split
+ <description> Determine the number of map task used in the follow up map join job
+ for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split
to perform a fine grained control.</description>
</property>
<property>
<name>hive.skewjoin.mapjoin.min.split</name>
<value>33554432</value>
- <description> Determine the number of map task at most used in the follow up map join job
- for a skew join by specifying the minimum split size. It should be used together with
+ <description> Determine the number of map task at most used in the follow up map join job
+ for a skew join by specifying the minimum split size. It should be used together with
hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
</property>
@@ -461,4 +461,10 @@
<description>Whether speculative execution for reducers should be turned on. </description>
</property>
+<property>
+ <name>hive.enforce.bucketing</name>
+ <value>false</value>
+ <description>Whether bucketing is enforced. If true, while inserting into the table, bucketing is enforced. </description>
+</property>
+
</configuration>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Feb 19 00:58:28 2010
@@ -74,6 +74,7 @@
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.varia.NullAppender;
+import org.apache.hadoop.mapred.Partitioner;
/**
* ExecDriver.
@@ -176,10 +177,10 @@
/**
* In Hive, when the user control-c's the command line, any running jobs
* spawned from that command line are best-effort killed.
- *
+ *
* This static constructor registers a shutdown thread to iterate over all the
* running job kill URLs and do a get on them.
- *
+ *
*/
static {
if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -271,7 +272,7 @@
* regular joins rather than map-side joins. Fatal errors are indicated by
* counters that are set at execution time. If the counter is non-zero, a
* fatal error occurred. The value of the counter indicates the error type.
- *
+ *
* @return true if fatal errors happened during job execution, false
* otherwise.
*/
@@ -374,7 +375,7 @@
/**
* Estimate the number of reducers needed for this job, based on job input,
* and configuration parameters.
- *
+ *
* @return the number of reducers.
*/
public int estimateNumberOfReducers(HiveConf hive, JobConf job,
@@ -439,7 +440,7 @@
/**
* Calculate the total size of input files.
- *
+ *
* @param job
* the hadoop job conf.
* @return the total size in bytes.
@@ -553,6 +554,14 @@
job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
+
+ try {
+ job.setPartitionerClass((Class<? extends Partitioner>)
+ (Class.forName(HiveConf.getVar(job, HiveConf.ConfVars.HIVEPARTITIONER))));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+
if (work.getNumMapTasks() != null) {
job.setNumMapTasks(work.getNumMapTasks().intValue());
}
@@ -735,7 +744,7 @@
/**
* This msg pattern is used to track when a job is started.
- *
+ *
* @param jobId
* @return
*/
@@ -745,7 +754,7 @@
/**
* this msg pattern is used to track when a job is successfully done.
- *
+ *
* @param jobId
* @return
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Feb 19 00:58:28 2010
@@ -20,23 +20,33 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePartitioner;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* File Sink operator implementation.
@@ -55,16 +65,31 @@
}
private static final long serialVersionUID = 1L;
- protected transient RecordWriter outWriter;
protected transient FileSystem fs;
- protected transient Path outPath;
- protected transient Path finalPath;
protected transient Serializer serializer;
protected transient BytesWritable commonKey = new BytesWritable();
protected transient TableIdEnum tabIdEnum = null;
private transient LongWritable row_count;
/**
+ * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets,
+ * it is not a good idea to start so many reducers - if the maximum number of reducers is 100,
+ * each reducer can write 10 files - this way we effectively get 1000 files.
+ */
+ private transient ExprNodeEvaluator[] partitionEval;
+ private transient int totalFiles;
+ private transient int numFiles;
+ private transient boolean multiFileSpray;
+ private transient Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+
+ private transient RecordWriter[] outWriters;
+ private transient Path[] outPaths;
+ private transient Path[] finalPaths;
+ private transient ObjectInspector[] partitionObjectInspectors;
+ private transient HivePartitioner<HiveKey, Object> prtner;
+ private transient HiveKey key = new HiveKey();
+
+ /**
* TableIdEnum.
*
*/
@@ -88,11 +113,11 @@
protected transient boolean autoDelete = false;
- private void commit() throws IOException {
- if (!fs.rename(outPath, finalPath)) {
- throw new IOException("Unable to rename output to: " + finalPath);
+ private void commit(int idx) throws IOException {
+ if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+ throw new IOException("Unable to rename output to: " + finalPaths[idx]);
}
- LOG.info("Committed to output file: " + finalPath);
+ LOG.info("Committed " + outPaths[idx] + " to output file: " + finalPaths[idx]);
}
@Override
@@ -110,42 +135,97 @@
jc = new JobConf(hconf, ExecDriver.class);
}
- int id = conf.getDestTableId();
- if ((id != 0) && (id <= TableIdEnum.values().length)) {
- String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
- tabIdEnum = TableIdEnum.valueOf(enumName);
- row_count = new LongWritable();
- statsMap.put(tabIdEnum, row_count);
+ multiFileSpray = conf.isMultiFileSpray();
+ totalFiles = conf.getTotalFiles();
+ numFiles = conf.getNumFiles();
+
+ if (multiFileSpray) {
+ partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+ int i = 0;
+ for (ExprNodeDesc e : conf.getPartitionCols()) {
+ partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+ }
+ partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
+ prtner = (HivePartitioner<HiveKey, Object>)ReflectionUtils.newInstance(jc.getPartitionerClass(), null);
}
+
+ outWriters = new RecordWriter[numFiles];
+ outPaths = new Path[numFiles];
+ finalPaths = new Path[numFiles];
+
String specPath = conf.getDirName();
Path tmpPath = Utilities.toTempPath(specPath);
- String taskId = Utilities.getTaskId(hconf);
+ Set<Integer> seenBuckets = new HashSet<Integer>();
fs = (new Path(specPath)).getFileSystem(hconf);
- finalPath = new Path(tmpPath, taskId);
- outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
-
- LOG.info("Writing to temp file: FS " + outPath);
HiveOutputFormat<?, ?> hiveOutputFormat = conf.getTableInfo()
- .getOutputFileFormatClass().newInstance();
+ .getOutputFileFormatClass().newInstance();
boolean isCompressed = conf.getCompressed();
-
- // The reason to keep these instead of using
- // OutputFormat.getRecordWriter() is that
- // getRecordWriter does not give us enough control over the file name that
- // we create.
Path parent = Utilities.toTempPath(specPath);
- finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc,
- hiveOutputFormat, isCompressed, finalPath);
- final Class<? extends Writable> outputClass = serializer
- .getSerializedClass();
- outWriter = HiveFileFormatUtils.getHiveRecordWriter(jc, conf
- .getTableInfo(), outputClass, conf, outPath);
+ final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+
+ // Create all the files - this is required because empty files need to be created for empty buckets
+ int filesIdx = 0;
+ for (int idx = 0; idx < totalFiles; idx++) {
+ String taskId = Utilities.getTaskId(hconf);
+
+ if (multiFileSpray) {
+ key.setHashCode(idx);
+
+ // Does this hashcode belong to this reducer
+ int numReducers = totalFiles/numFiles;
+
+ if (numReducers > 1) {
+ int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)));
+
+ int reducerIdx = prtner.getPartition(key, null, numReducers);
+ if (currReducer != reducerIdx)
+ continue;
+ }
+
+ int bucketNum = prtner.getBucket(key, null, totalFiles);
+ if (seenBuckets.contains(bucketNum))
+ continue;
+ seenBuckets.add(bucketNum);
+
+ bucketMap.put(bucketNum, filesIdx);
+ taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
+ }
+
+ finalPaths[filesIdx] = new Path(tmpPath, taskId);
+ LOG.info("Final Path: FS " + finalPaths[filesIdx]);
+
+ outPaths[filesIdx] = new Path(tmpPath, Utilities.toTempPath(taskId));
+
+ LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
+
+ // The reason to keep these instead of using
+ // OutputFormat.getRecordWriter() is that
+ // getRecordWriter does not give us enough control over the file name that
+ // we create.
+ finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath(parent, taskId, jc,
+ hiveOutputFormat, isCompressed, finalPaths[filesIdx]);
+ LOG.info("New Final Path: FS " + finalPaths[filesIdx]);
+
+ outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf
+ .getTableInfo(), outputClass, conf, outPaths[filesIdx]);
+
+ filesIdx++;
+ }
+
+ assert filesIdx == numFiles;
// in recent hadoop versions, use deleteOnExit to clean tmp files.
- autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
- outPath);
+ autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPaths[0]);
+
+ int id = conf.getDestTableId();
+ if ((id != 0) && (id <= TableIdEnum.values().length)) {
+ String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
+ tabIdEnum = TableIdEnum.valueOf(enumName);
+ row_count = new LongWritable();
+ statsMap.put(tabIdEnum, row_count);
+ }
initializeChildren(hconf);
} catch (HiveException e) {
@@ -180,7 +260,21 @@
row_count.set(row_count.get() + 1);
}
- outWriter.write(recordValue);
+ if (!multiFileSpray) {
+ outWriters[0].write(recordValue);
+ }
+ else {
+ int keyHashCode = 0;
+ for (int i = 0; i < partitionEval.length; i++) {
+ Object o = partitionEval[i].evaluate(row);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ }
+ key.setHashCode(keyHashCode);
+ int bucketNum = prtner.getBucket(key, null, totalFiles);
+ int idx = bucketMap.get(bucketNum);
+ outWriters[bucketMap.get(bucketNum)].write(recordValue);
+ }
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
@@ -192,12 +286,14 @@
public void closeOp(boolean abort) throws HiveException {
if (!abort) {
- if (outWriter != null) {
- try {
- outWriter.close(abort);
- commit();
- } catch (IOException e) {
- throw new HiveException(e);
+ for (int idx = 0; idx < numFiles; idx++) {
+ if (outWriters[idx] != null) {
+ try {
+ outWriters[idx].close(abort);
+ commit(idx);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
}
}
} else {
@@ -205,9 +301,11 @@
// Hadoop always call close() even if an Exception was thrown in map() or
// reduce().
try {
- outWriter.close(abort);
- if (!autoDelete) {
- fs.delete(outPath, true);
+ for (int idx = 0; idx < numFiles; idx++) {
+ outWriters[idx].close(abort);
+ if (!autoDelete) {
+ fs.delete(outPaths[idx], true);
+ }
}
} catch (Exception e) {
e.printStackTrace();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Feb 19 00:58:28 2010
@@ -984,6 +984,24 @@
}
/**
+ * Replace the task id from the filename. E.g., replace "000000" out of
+ * "24931_r_000000_0" or "24931_r_000000_0.gz" by 33 to
+ * "24931_r_000033_0" or "24931_r_000033_0.gz"
+ */
+ public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
+ String taskId = getTaskIdFromFilename(filename);
+ String strBucketNum = String.valueOf(bucketNum);
+ int bucketNumLen = strBucketNum.length();
+ int taskIdLen = taskId.length();
+ StringBuffer s = new StringBuffer();
+ for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
+ s.append("0");
+ }
+ String newTaskId = s.toString() + strBucketNum;
+ return filename.replaceAll(taskId, newTaskId);
+ }
+
+ /**
* Remove all temporary files and duplicate (double-committed) files from a
* given directory.
*/
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Feb 19 00:58:28 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+
+/** Partition keys by their {@link Object#hashCode()}. */
+public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> {
+
+ /** Use {@link Object#hashCode()} to partition. */
+ public int getBucket(K2 key, V2 value, int numBuckets) {
+ return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+ }
+
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Fri Feb 19 00:58:28 2010
@@ -50,7 +50,7 @@
* registerOutputFormatSubstitute(Class, Class) getOutputFormatSubstitute(Class)
* are added for backward compatibility. They return the newly added
* HiveOutputFormat for the older ones.
- *
+ *
*/
public final class HiveFileFormatUtils {
@@ -69,7 +69,7 @@
/**
* register a substitute.
- *
+ *
* @param origin
* the class that need to be substituted
* @param substitute
@@ -97,17 +97,17 @@
/**
* get the final output path of a given FileOutputFormat.
- *
+ *
* @param parent
* parent dir of the expected final output path
* @param jc
* job configuration
*/
- public static Path getOutputFormatFinalPath(Path parent, JobConf jc,
+ public static Path getOutputFormatFinalPath(Path parent, String taskId, JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat, boolean isCompressed,
Path defaultFinalPath) throws IOException {
if (hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) {
- return new Path(parent, Utilities.getTaskId(jc)
+ return new Path(parent, taskId
+ Utilities.getFileExtension(jc, isCompressed));
}
return defaultFinalPath;
@@ -131,7 +131,7 @@
/**
* register an InputFormatChecker for a given InputFormat.
- *
+ *
* @param format
* the class that need to be substituted
* @param checker
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java Fri Feb 19 00:58:28 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.mapred.Partitioner;
+
+/** Partition keys by their {@link Object#hashCode()}. */
+public interface HivePartitioner<K2, V2> extends Partitioner<K2, V2> {
+
+ /**
+ * Get the final bucket within a partition.
+ */
+ public int getBucket(K2 key, V2 value, int numBuckets);
+}
+
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Feb 19 00:58:28 2010
@@ -300,7 +300,7 @@
/**
* DFS-scan the expressionTree to find all aggregation subtrees and put them
* in aggregations.
- *
+ *
* @param expressionTree
* @param aggregations
* the key to the HashTable is the toStringTree() representation of
@@ -353,7 +353,7 @@
* Goes though the tabref tree and finds the alias for the table. Once found,
* it records the table name-> alias association in aliasToTabs. It also makes
* an association from the alias to the table AST in parse info.
- *
+ *
* @return the alias of the table
*/
private String processTable(QB qb, ASTNode tabref) throws SemanticException {
@@ -469,7 +469,7 @@
/**
* Given the AST with TOK_JOIN as the root, get all the aliases for the tables
* or subqueries in the join.
- *
+ *
* @param qb
* @param join
* @throws SemanticException
@@ -505,7 +505,7 @@
* Given the AST with TOK_LATERAL_VIEW as the root, get the alias for the
* table or subquery in the lateral view and also make a mapping from the
* alias to all the lateral view AST's.
- *
+ *
* @param qb
* @param lateralView
* @return the alias for the table/subquery
@@ -541,7 +541,7 @@
/**
* Phase 1: (including, but not limited to):
- *
+ *
* 1. Gets all the aliases for all the tables / subqueries and makes the
* appropriate mapping in aliasToTabs, aliasToSubq 2. Gets the location of the
* destination and names the clase "inclause" + i 3. Creates a map from a
@@ -549,7 +549,7 @@
* 4. Creates a mapping from the clause name to the select expression AST in
* destToSelExpr 5. Creates a mapping from a table alias to the lateral view
* AST's in aliasToLateralViews
- *
+ *
* @param ast
* @param qb
* @param ctx_1
@@ -996,7 +996,7 @@
* condition involves both subtrees and is not a equality. Also, we only
* support AND i.e ORs are not supported currently as their semantics are not
* very clear, may lead to data explosion and there is no usecase.
- *
+ *
* @param joinTree
* jointree to be populated
* @param joinCond
@@ -1140,7 +1140,7 @@
/**
* create a filter plan. The condition and the inputs are specified.
- *
+ *
* @param qb
* current query block
* @param condn
@@ -1898,7 +1898,7 @@
/**
* Returns the GenericUDAFInfo struct for the aggregation.
- *
+ *
* @param aggName
* The name of the UDAF.
* @param aggParameters
@@ -1962,7 +1962,7 @@
/**
* Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
* The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
- *
+ *
* @param mode
* The mode of the aggregation (PARTIAL1 or COMPLETE)
* @param genericUDAFEvaluators
@@ -2062,7 +2062,7 @@
/**
* Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
* The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
- *
+ *
* @param mode
* The mode of the aggregation (MERGEPARTIAL, PARTIAL2)
* @param genericUDAFEvaluators
@@ -2190,7 +2190,7 @@
* Generate the map-side GroupByOperator for the Query Block
* (qb.getParseInfo().getXXX(dest)). The new GroupByOperator will be a child
* of the inputOperatorInfo.
- *
+ *
* @param mode
* The mode of the aggregation (HASH)
* @param genericUDAFEvaluators
@@ -2303,10 +2303,10 @@
* Generate the ReduceSinkOperator for the Group By Query Block
* (qb.getPartInfo().getXXX(dest)). The new ReduceSinkOperator will be a child
* of inputOperatorInfo.
- *
+ *
* It will put all Group By keys and the distinct field (if any) in the
* map-reduce sort key, and all other fields in the map-reduce value.
- *
+ *
* @param numPartitionFields
* the number of fields for map-reduce partitioning. This is usually
* the number of fields in the Group By keys.
@@ -2425,10 +2425,10 @@
* Generate the second ReduceSinkOperator for the Group By Plan
* (parseInfo.getXXX(dest)). The new ReduceSinkOperator will be a child of
* groupByOperatorInfo.
- *
+ *
* The second ReduceSinkOperator will put the group by keys in the map-reduce
* sort key, and put the partial aggregation results in the map-reduce value.
- *
+ *
* @param numPartitionFields
* the number of fields in the map-reduce partition key. This should
* always be the same as the number of Group By keys. We should be
@@ -2498,7 +2498,7 @@
* Generate the second GroupByOperator for the Group By Plan
* (parseInfo.getXXX(dest)). The new GroupByOperator will do the second
* aggregation based on the partial aggregation results.
- *
+ *
* @param mode
* the mode of aggregation (FINAL)
* @param genericUDAFEvaluators
@@ -2587,26 +2587,26 @@
/**
* Generate a Group-By plan using a single map-reduce job (3 operators will be
* inserted):
- *
+ *
* ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP,
* A2_EXP) ) SortGroupBy (keys = (KEY.0,KEY.1), aggregations =
* (count_distinct(KEY.2), sum(VALUE.0), count(VALUE.1))) Select (final
* selects).
- *
+ *
* @param dest
* @param qb
* @param input
* @return
* @throws SemanticException
- *
+ *
* Generate a Group-By plan using 1 map-reduce job. Spray by the
* group by key, and sort by the distinct key (if any), and compute
* aggregates * The agggregation evaluation functions are as
* follows: Partitioning Key: grouping key
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key
* if DISTINCT
- *
+ *
* Reducer: iterate/merge (mode = COMPLETE)
**/
@SuppressWarnings({"unused", "nls"})
@@ -2643,30 +2643,30 @@
/**
* Generate a Multi Group-By plan using a 2 map-reduce jobs.
- *
+ *
* @param dest
* @param qb
* @param input
* @return
* @throws SemanticException
- *
+ *
* Generate a Group-By plan using a 2 map-reduce jobs. Spray by the
* distinct key in hope of getting a uniform distribution, and
* compute partial aggregates by the grouping key. Evaluate partial
* aggregates first, and spray by the grouping key to compute actual
* aggregates in the second phase. The agggregation evaluation
* functions are as follows: Partitioning Key: distinct key
- *
+ *
* Sorting Key: distinct key
- *
+ *
* Reducer: iterate/terminatePartial (mode = PARTIAL1)
- *
+ *
* STAGE 2
- *
+ *
* Partitioning Key: grouping key
- *
+ *
* Sorting Key: grouping key
- *
+ *
* Reducer: merge/terminate (mode = FINAL)
*/
@SuppressWarnings("nls")
@@ -2701,20 +2701,20 @@
/**
* Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be
* inserted):
- *
+ *
* ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP,
* A2_EXP) ) NOTE: If DISTINCT_EXP is null, partition by rand() SortGroupBy
* (keys = (KEY.0,KEY.1), aggregations = (count_distinct(KEY.2), sum(VALUE.0),
* count(VALUE.1))) ReduceSink ( keys = (0,1), values=(2,3,4)) SortGroupBy
* (keys = (KEY.0,KEY.1), aggregations = (sum(VALUE.0), sum(VALUE.1),
* sum(VALUE.2))) Select (final selects).
- *
+ *
* @param dest
* @param qb
* @param input
* @return
* @throws SemanticException
- *
+ *
* Generate a Group-By plan using a 2 map-reduce jobs. Spray by the
* grouping key and distinct key (or a random number, if no distinct
* is present) in hope of getting a uniform distribution, and
@@ -2724,19 +2724,19 @@
* phase. The agggregation evaluation functions are as follows:
* Partitioning Key: random() if no DISTINCT grouping + distinct key
* if DISTINCT
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key
* if DISTINCT
- *
+ *
* Reducer: iterate/terminatePartial (mode = PARTIAL1)
- *
+ *
* STAGE 2
- *
+ *
* Partitioning Key: grouping key
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key
* if DISTINCT
- *
+ *
* Reducer: merge/terminate (mode = FINAL)
*/
@SuppressWarnings("nls")
@@ -2800,15 +2800,15 @@
* we may turn off map-side partial aggregation based on its performance. Then
* spray by the group by key, and sort by the distinct key (if any), and
* compute aggregates based on actual aggregates
- *
+ *
* The agggregation evaluation functions are as follows: Mapper:
* iterate/terminatePartial (mode = HASH)
- *
+ *
* Partitioning Key: grouping key
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key if
* DISTINCT
- *
+ *
* Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode
* = MERGEPARTIAL)
*/
@@ -2864,23 +2864,23 @@
* key). Evaluate partial aggregates first, and spray by the grouping key to
* compute actual aggregates in the second phase. The agggregation evaluation
* functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
- *
+ *
* Partitioning Key: random() if no DISTINCT grouping + distinct key if
* DISTINCT
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key if
* DISTINCT
- *
+ *
* Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO
* DISTINCT (mode = MERGEPARTIAL)
- *
+ *
* STAGE 2
- *
+ *
* Partitioining Key: grouping key
- *
+ *
* Sorting Key: grouping key if no DISTINCT grouping + distinct key if
* DISTINCT
- *
+ *
* Reducer: merge/terminate (mode = FINAL)
*/
@SuppressWarnings("nls")
@@ -2890,7 +2890,7 @@
QBParseInfo parseInfo = qb.getParseInfo();
// ////// Generate GroupbyOperator for a map-side partial aggregation
- Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
+ Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
new LinkedHashMap<String, GenericUDAFEvaluator>();
GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator(
qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH,
@@ -2961,6 +2961,15 @@
return input;
}
+ private int getReducersBucketing(int totalFiles, int maxReducers) {
+ int numFiles = totalFiles/maxReducers;
+ while (true) {
+ if (totalFiles%numFiles == 0)
+ return totalFiles/numFiles;
+ numFiles++;
+ }
+ }
+
@SuppressWarnings("nls")
private Operator genFileSinkPlan(String dest, QB qb, Operator input)
throws SemanticException {
@@ -2975,11 +2984,47 @@
TableDesc table_desc = null;
int currentTableId = 0;
boolean isLocal = false;
+ boolean multiFileSpray = false;
+ int numFiles = 1;
+ int totalFiles = 1;
+ ArrayList<ExprNodeDesc> partnCols = null;
switch (dest_type.intValue()) {
case QBMetaData.DEST_TABLE: {
dest_tab = qbm.getDestTableForAlias(dest);
+
+ // If the table is bucketed, and bucketing is enforced, do the following:
+ // If the number of buckets is smaller than the number of maximum reducers,
+ // create those many reducers.
+ // If not, create a multiFileSink instead of FileSink - the multiFileSink will
+ // spray the data into multiple buckets. That way, we can support a very large
+ // number of buckets without needing a very large number of reducers.
+
+ if ((dest_tab.getNumBuckets() > 0) &&
+ (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+ int numBuckets = dest_tab.getNumBuckets();
+ if (numBuckets > maxReducers) {
+ multiFileSpray = true;
+ totalFiles = numBuckets;
+ if (totalFiles % maxReducers == 0) {
+ numFiles = totalFiles / maxReducers;
+ }
+ else {
+ // find the number of reducers such that it is a divisor of totalFiles
+ maxReducers = getReducersBucketing(totalFiles, maxReducers);
+ numFiles = totalFiles/maxReducers;
+ }
+ }
+ else {
+ maxReducers = numBuckets;
+ }
+
+ partnCols = getParitionColsFromBucketCols(dest_tab, input);
+ input = genReduceSinkPlanForBucketing(dest_tab, input, partnCols, maxReducers);
+ }
+
// check for partition
List<FieldSchema> parts = dest_tab.getPartitionKeys();
if (parts != null && parts.size() > 0) {
@@ -3007,6 +3052,31 @@
Partition dest_part = qbm.getDestPartitionForAlias(dest);
dest_tab = dest_part.getTable();
+
+ if ((dest_tab.getNumBuckets() > 0) &&
+ (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+ int numBuckets = dest_tab.getNumBuckets();
+ if (numBuckets > maxReducers) {
+ multiFileSpray = true;
+ totalFiles = numBuckets;
+ if (totalFiles % maxReducers == 0) {
+ numFiles = totalFiles / maxReducers;
+ }
+ else {
+ // find the number of reducers such that it is a divisor of totalFiles
+ maxReducers = getReducersBucketing(totalFiles, maxReducers);
+ numFiles = totalFiles/maxReducers;
+ }
+ }
+ else {
+ maxReducers = numBuckets;
+ }
+
+ partnCols = getParitionColsFromBucketCols(dest_tab, input);
+ input = genReduceSinkPlanForBucketing(dest_tab, input, partnCols, maxReducers);
+ }
+
dest_path = dest_part.getPath()[0];
queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
table_desc = Utilities.getTableDesc(dest_tab);
@@ -3161,7 +3231,8 @@
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
new FileSinkDesc(queryTmpdir, table_desc, conf
- .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId),
+ .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId,
+ multiFileSpray, numFiles, totalFiles, partnCols),
fsRS, input), inputRR);
LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
@@ -3392,6 +3463,82 @@
return genLimitPlan(dest, qb, curr, limit);
}
+ private ArrayList<ExprNodeDesc> getParitionColsFromBucketCols(Table tab, Operator input) {
+ RowResolver inputRR = opParseCtx.get(input).getRR();
+ List<String> tabBucketCols = tab.getBucketCols();
+ List<FieldSchema> tabCols = tab.getCols();
+
+ // Partition by the bucketing column
+ ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+ for (String bucketCol : tabBucketCols) {
+ int pos = 0;
+ for (FieldSchema tabCol : tabCols) {
+ if (bucketCol.equals(tabCol.getName())) {
+ ColumnInfo colInfo = inputRR.getColumnInfos().get(pos);
+ partitionCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsPartitionCol()));
+
+ break;
+ }
+ pos++;
+ }
+ }
+ return partitionCols;
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genReduceSinkPlanForBucketing(Table tab, Operator input, ArrayList<ExprNodeDesc> partitionCols,
+ int numReducers)
+ throws SemanticException {
+ RowResolver inputRR = opParseCtx.get(input).getRR();
+
+ ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+
+ // For the generation of the values expression just get the inputs
+ // signature and generate field expressions for those
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
+ valueCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsPartitionCol()));
+ colExprMap.put(colInfo.getInternalName(), valueCols
+ .get(valueCols.size() - 1));
+ }
+
+ ArrayList<String> outputColumns = new ArrayList<String>();
+ for (int i = 0; i < valueCols.size(); i++) {
+ outputColumns.add(getColumnInternalName(i));
+ }
+ Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+ .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
+ partitionCols, new String(), numReducers),
+ new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+ interim.setColumnExprMap(colExprMap);
+
+ // Add the extract operator to get the value fields
+ RowResolver out_rwsch = new RowResolver();
+ RowResolver interim_rwsch = inputRR;
+ Integer pos = Integer.valueOf(0);
+ for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
+ String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
+ out_rwsch.put(info[0], info[1], new ColumnInfo(
+ getColumnInternalName(pos), colInfo.getType(), info[0], false));
+ pos = Integer.valueOf(pos.intValue() + 1);
+ }
+
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
+ out_rwsch.getColumnInfos()), interim), out_rwsch);
+
+ LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() + " row schema: "
+ + out_rwsch.toString());
+ return output;
+
+ }
+
@SuppressWarnings("nls")
private Operator genReduceSinkPlan(String dest, QB qb, Operator input,
int numReducers) throws SemanticException {
@@ -3705,7 +3852,7 @@
/**
* Construct a selection operator for semijoin that filter out all fields
* other than the group by keys.
- *
+ *
* @param fields
* list of fields need to be output
* @param input
@@ -4735,9 +4882,9 @@
* expressions are provided on the TABLESAMPLE clause and the table has
* clustering columns defined in it's metadata. The predicate created has the
* following structure:
- *
+ *
* ((hash(expressions) & Integer.MAX_VALUE) % denominator) == numerator
- *
+ *
* @param ts
* TABLESAMPLE clause information
* @param bucketCols
@@ -5068,7 +5215,7 @@
/**
* Generates the operator DAG needed to implement lateral views and attaches
* it to the TS operator.
- *
+ *
* @param aliasToOpInfo
* A mapping from a table alias to the TS operator. This function
* replaces the operator mapping as necessary
@@ -5137,11 +5284,11 @@
* A helper function that gets all the columns and respective aliases in the
* source and puts them into dest. It renames the internal names of the
* columns based on getColumnInternalName(position).
- *
+ *
* Note that this helper method relies on RowResolver.getColumnInfos()
* returning the columns in the same order as they will be passed in the
* operator DAG.
- *
+ *
* @param source
* @param dest
* @param outputColNames
@@ -5703,7 +5850,7 @@
* Generates an expression node descriptor for the expression passed in the
* arguments. This function uses the row resolver and the metadata information
* that are passed as arguments to resolve the column names to internal names.
- *
+ *
* @param expr
* The expression
* @param input
@@ -5810,7 +5957,7 @@
* throws and exception in case the same column name is present in multiple
* table. The exception message indicates that the ambiguity could not be
* resolved.
- *
+ *
* @param qbm
* The metadata where the function looks for the table alias
* @param colName
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Fri Feb 19 00:58:28 2010
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.ArrayList;
/**
* FileSinkDesc.
@@ -33,17 +34,26 @@
private int destTableId;
private String compressCodec;
private String compressType;
+ private boolean multiFileSpray;
+ private int totalFiles;
+ private ArrayList<ExprNodeDesc> partitionCols;
+ private int numFiles;
public FileSinkDesc() {
}
public FileSinkDesc(final String dirName, final TableDesc tableInfo,
- final boolean compressed, int destTableId) {
+ final boolean compressed, final int destTableId, final boolean multiFileSpray,
+ final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols) {
this.dirName = dirName;
this.tableInfo = tableInfo;
this.compressed = compressed;
this.destTableId = destTableId;
+ this.multiFileSpray = multiFileSpray;
+ this.numFiles = numFiles;
+ this.totalFiles = totalFiles;
+ this.partitionCols = partitionCols;
}
public FileSinkDesc(final String dirName, final TableDesc tableInfo,
@@ -53,6 +63,10 @@
this.tableInfo = tableInfo;
this.compressed = compressed;
destTableId = 0;
+ this.multiFileSpray = false;
+ this.numFiles = 1;
+ this.totalFiles = 1;
+ this.partitionCols = null;
}
@Explain(displayName = "directory", normalExplain = false)
@@ -106,4 +120,63 @@
public void setCompressType(String intermediateCompressType) {
compressType = intermediateCompressType;
}
+
+ /**
+ * @return the multiFileSpray
+ */
+ @Explain(displayName = "MultiFileSpray", normalExplain = false)
+ public boolean isMultiFileSpray() {
+ return multiFileSpray;
+ }
+
+ /**
+ * @param multiFileSpray the multiFileSpray to set
+ */
+ public void setMultiFileSpray(boolean multiFileSpray) {
+ this.multiFileSpray = multiFileSpray;
+ }
+
+ /**
+ * @return the totalFiles
+ */
+ @Explain(displayName = "TotalFiles", normalExplain = false)
+ public int getTotalFiles() {
+ return totalFiles;
+ }
+
+ /**
+ * @param totalFiles the totalFiles to set
+ */
+ public void setTotalFiles(int totalFiles) {
+ this.totalFiles = totalFiles;
+ }
+
+ /**
+ * @return the partitionCols
+ */
+ public ArrayList<ExprNodeDesc> getPartitionCols() {
+ return partitionCols;
+ }
+
+ /**
+ * @param partitionCols the partitionCols to set
+ */
+ public void setPartitionCols(ArrayList<ExprNodeDesc> partitionCols) {
+ this.partitionCols = partitionCols;
+ }
+
+ /**
+ * @return the numFiles
+ */
+ @Explain(displayName = "NumFilesPerFileSink", normalExplain = false)
+ public int getNumFiles() {
+ return numFiles;
+ }
+
+ /**
+ * @param numFiles the numFiles to set
+ */
+ public void setNumFiles(int numFiles) {
+ this.numFiles = numFiles;
+ }
}
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,16 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 200;
+
+drop table bucket1_1;
+CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS;
+
+explain extended
+insert overwrite table bucket1_1
+select * from src;
+
+insert overwrite table bucket1_1
+select * from src;
+
+select * from bucket1_1 order by key;
+
+drop table bucket1_1;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,19 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+
+drop table bucket2_1;
+CREATE TABLE bucket2_1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS;
+
+explain extended
+insert overwrite table bucket2_1
+select * from src;
+
+insert overwrite table bucket2_1
+select * from src;
+
+explain
+select * from bucket2_1 tablesample (bucket 1 out of 2) s order by key;
+
+select * from bucket2_1 tablesample (bucket 1 out of 2) s order by key;
+
+drop table bucket2_1;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,22 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+
+drop table bucket3_1;
+CREATE TABLE bucket3_1(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS;
+
+explain extended
+insert overwrite table bucket3_1 partition (ds='1')
+select * from src;
+
+insert overwrite table bucket3_1 partition (ds='1')
+select * from src;
+
+insert overwrite table bucket3_1 partition (ds='2')
+select * from src;
+
+explain
+select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' order by key;
+
+select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' order by key;
+
+drop table bucket3_1;
Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out Fri Feb 19 00:58:28 2010
@@ -92,7 +92,8 @@
File Output Operator
compressed: false
GlobalTableId: 1
- directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
+ directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
+ NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -102,20 +103,22 @@
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
name dest1
serialization.ddl struct dest1 { string mydata}
serialization.format 1
serialization.last.column.takes.rest true
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041758
+ transient_lastDdlTime 1266449090
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
+ TotalFiles: 1
+ MultiFileSpray: false
Needs Tagging: false
Path -> Alias:
- file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src [src]
+ file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src [src]
Path -> Partition:
- file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src
+ file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
Partition
base file name: src
input format: org.apache.hadoop.mapred.TextInputFormat
@@ -126,12 +129,12 @@
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
name src
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041757
+ transient_lastDdlTime 1266449089
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
@@ -142,12 +145,12 @@
columns.types string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
name src
serialization.ddl struct src { string key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041757
+ transient_lastDdlTime 1266449089
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: src
name: src
@@ -159,14 +162,14 @@
Move Operator
files:
hdfs directory: true
- source: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
- destination: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+ source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
+ destination: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
Stage: Stage-0
Move Operator
tables:
replace: true
- source: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+ source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -176,21 +179,21 @@
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
name dest1
serialization.ddl struct dest1 { string mydata}
serialization.format 1
serialization.last.column.takes.rest true
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041758
+ transient_lastDdlTime 1266449090
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
- tmp directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10001
+ tmp directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10001
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
- file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
+ file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
Reduce Output Operator
sort order:
Map-reduce partition columns:
@@ -202,9 +205,9 @@
type: string
Needs Tagging: false
Path -> Alias:
- file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002 [file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002]
+ file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002 [file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002]
Path -> Partition:
- file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
+ file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
Partition
base file name: 10002
input format: org.apache.hadoop.mapred.TextInputFormat
@@ -215,13 +218,13 @@
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
name dest1
serialization.ddl struct dest1 { string mydata}
serialization.format 1
serialization.last.column.takes.rest true
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041758
+ transient_lastDdlTime 1266449090
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
@@ -232,13 +235,13 @@
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
name dest1
serialization.ddl struct dest1 { string mydata}
serialization.format 1
serialization.last.column.takes.rest true
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041758
+ transient_lastDdlTime 1266449090
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
name: dest1
@@ -247,7 +250,8 @@
File Output Operator
compressed: false
GlobalTableId: 0
- directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+ directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
+ NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -257,15 +261,17 @@
columns.types string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
- location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
name dest1
serialization.ddl struct dest1 { string mydata}
serialization.format 1
serialization.last.column.takes.rest true
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- transient_lastDdlTime 1266041758
+ transient_lastDdlTime 1266449090
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
+ TotalFiles: 1
+ MultiFileSpray: false
PREHOOK: query: INSERT OVERWRITE TABLE dest1
@@ -300,12 +306,12 @@
SELECT * FROM dest1
PREHOOK: type: QUERY
PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-16-03_636_3562459403060191123/10000
+PREHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-54_510_6167292116608425875/10000
POSTHOOK: query: -- Test the result
SELECT * FROM dest1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-16-03_636_3562459403060191123/10000
+POSTHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-54_510_6167292116608425875/10000
238 val_238
86 val_86
311 val_311
Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out Fri Feb 19 00:58:28 2010
@@ -0,0 +1,673 @@
+PREHOOK: query: drop table bucket1_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket1_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket1_1
+PREHOOK: query: explain extended
+insert overwrite table bucket1_1
+select * from src
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table bucket1_1
+select * from src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB bucket1_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ sort order:
+ Map-reduce partition columns:
+ expr: _col0
+ type: string
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Needs Tagging: false
+ Path -> Alias:
+ file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src [src]
+ Path -> Partition:
+ file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
+ Partition
+ base file name: src
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
+ name src
+ serialization.ddl struct src { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1266535134
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
+ name src
+ serialization.ddl struct src { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1266535134
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: src
+ name: src
+ Reduce Operator Tree:
+ Extract
+ Select Operator
+ expressions:
+ expr: UDFToInteger(_col0)
+ type: int
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10000
+ NumFilesPerFileSink: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count 100
+ bucket_field_name key
+ columns key,value
+ columns.types int:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/bucket1_1
+ name bucket1_1
+ serialization.ddl struct bucket1_1 { i32 key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1266535134
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: bucket1_1
+ TotalFiles: 1
+ MultiFileSpray: false
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10000
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count 100
+ bucket_field_name key
+ columns key,value
+ columns.types int:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/bucket1_1
+ name bucket1_1
+ serialization.ddl struct bucket1_1 { i32 key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1266535134
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: bucket1_1
+ tmp directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10001
+
+
+PREHOOK: query: insert overwrite table bucket1_1
+select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@bucket1_1
+POSTHOOK: query: insert overwrite table bucket1_1
+select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@bucket1_1
+PREHOOK: query: select * from bucket1_1 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket1_1
+PREHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-59_053_2682513110201565086/10000
+POSTHOOK: query: select * from bucket1_1 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket1_1
+POSTHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-59_053_2682513110201565086/10000
+0 val_0
+0 val_0
+0 val_0
+2 val_2
+4 val_4
+5 val_5
+5 val_5
+5 val_5
+8 val_8
+9 val_9
+10 val_10
+11 val_11
+12 val_12
+12 val_12
+15 val_15
+15 val_15
+17 val_17
+18 val_18
+18 val_18
+19 val_19
+20 val_20
+24 val_24
+24 val_24
+26 val_26
+26 val_26
+27 val_27
+28 val_28
+30 val_30
+33 val_33
+34 val_34
+35 val_35
+35 val_35
+35 val_35
+37 val_37
+37 val_37
+41 val_41
+42 val_42
+42 val_42
+43 val_43
+44 val_44
+47 val_47
+51 val_51
+51 val_51
+53 val_53
+54 val_54
+57 val_57
+58 val_58
+58 val_58
+64 val_64
+65 val_65
+66 val_66
+67 val_67
+67 val_67
+69 val_69
+70 val_70
+70 val_70
+70 val_70
+72 val_72
+72 val_72
+74 val_74
+76 val_76
+76 val_76
+77 val_77
+78 val_78
+80 val_80
+82 val_82
+83 val_83
+83 val_83
+84 val_84
+84 val_84
+85 val_85
+86 val_86
+87 val_87
+90 val_90
+90 val_90
+90 val_90
+92 val_92
+95 val_95
+95 val_95
+96 val_96
+97 val_97
+97 val_97
+98 val_98
+98 val_98
+100 val_100
+100 val_100
+103 val_103
+103 val_103
+104 val_104
+104 val_104
+105 val_105
+111 val_111
+113 val_113
+113 val_113
+114 val_114
+116 val_116
+118 val_118
+118 val_118
+119 val_119
+119 val_119
+119 val_119
+120 val_120
+120 val_120
+125 val_125
+125 val_125
+126 val_126
+128 val_128
+128 val_128
+128 val_128
+129 val_129
+129 val_129
+131 val_131
+133 val_133
+134 val_134
+134 val_134
+136 val_136
+137 val_137
+137 val_137
+138 val_138
+138 val_138
+138 val_138
+138 val_138
+143 val_143
+145 val_145
+146 val_146
+146 val_146
+149 val_149
+149 val_149
+150 val_150
+152 val_152
+152 val_152
+153 val_153
+155 val_155
+156 val_156
+157 val_157
+158 val_158
+160 val_160
+162 val_162
+163 val_163
+164 val_164
+164 val_164
+165 val_165
+165 val_165
+166 val_166
+167 val_167
+167 val_167
+167 val_167
+168 val_168
+169 val_169
+169 val_169
+169 val_169
+169 val_169
+170 val_170
+172 val_172
+172 val_172
+174 val_174
+174 val_174
+175 val_175
+175 val_175
+176 val_176
+176 val_176
+177 val_177
+178 val_178
+179 val_179
+179 val_179
+180 val_180
+181 val_181
+183 val_183
+186 val_186
+187 val_187
+187 val_187
+187 val_187
+189 val_189
+190 val_190
+191 val_191
+191 val_191
+192 val_192
+193 val_193
+193 val_193
+193 val_193
+194 val_194
+195 val_195
+195 val_195
+196 val_196
+197 val_197
+197 val_197
+199 val_199
+199 val_199
+199 val_199
+200 val_200
+200 val_200
+201 val_201
+202 val_202
+203 val_203
+203 val_203
+205 val_205
+205 val_205
+207 val_207
+207 val_207
+208 val_208
+208 val_208
+208 val_208
+209 val_209
+209 val_209
+213 val_213
+213 val_213
+214 val_214
+216 val_216
+216 val_216
+217 val_217
+217 val_217
+218 val_218
+219 val_219
+219 val_219
+221 val_221
+221 val_221
+222 val_222
+223 val_223
+223 val_223
+224 val_224
+224 val_224
+226 val_226
+228 val_228
+229 val_229
+229 val_229
+230 val_230
+230 val_230
+230 val_230
+230 val_230
+230 val_230
+233 val_233
+233 val_233
+235 val_235
+237 val_237
+237 val_237
+238 val_238
+238 val_238
+239 val_239
+239 val_239
+241 val_241
+242 val_242
+242 val_242
+244 val_244
+247 val_247
+248 val_248
+249 val_249
+252 val_252
+255 val_255
+255 val_255
+256 val_256
+256 val_256
+257 val_257
+258 val_258
+260 val_260
+262 val_262
+263 val_263
+265 val_265
+265 val_265
+266 val_266
+272 val_272
+272 val_272
+273 val_273
+273 val_273
+273 val_273
+274 val_274
+275 val_275
+277 val_277
+277 val_277
+277 val_277
+277 val_277
+278 val_278
+278 val_278
+280 val_280
+280 val_280
+281 val_281
+281 val_281
+282 val_282
+282 val_282
+283 val_283
+284 val_284
+285 val_285
+286 val_286
+287 val_287
+288 val_288
+288 val_288
+289 val_289
+291 val_291
+292 val_292
+296 val_296
+298 val_298
+298 val_298
+298 val_298
+302 val_302
+305 val_305
+306 val_306
+307 val_307
+307 val_307
+308 val_308
+309 val_309
+309 val_309
+310 val_310
+311 val_311
+311 val_311
+311 val_311
+315 val_315
+316 val_316
+316 val_316
+316 val_316
+317 val_317
+317 val_317
+318 val_318
+318 val_318
+318 val_318
+321 val_321
+321 val_321
+322 val_322
+322 val_322
+323 val_323
+325 val_325
+325 val_325
+327 val_327
+327 val_327
+327 val_327
+331 val_331
+331 val_331
+332 val_332
+333 val_333
+333 val_333
+335 val_335
+336 val_336
+338 val_338
+339 val_339
+341 val_341
+342 val_342
+342 val_342
+344 val_344
+344 val_344
+345 val_345
+348 val_348
+348 val_348
+348 val_348
+348 val_348
+348 val_348
+351 val_351
+353 val_353
+353 val_353
+356 val_356
+360 val_360
+362 val_362
+364 val_364
+365 val_365
+366 val_366
+367 val_367
+367 val_367
+368 val_368
+369 val_369
+369 val_369
+369 val_369
+373 val_373
+374 val_374
+375 val_375
+377 val_377
+378 val_378
+379 val_379
+382 val_382
+382 val_382
+384 val_384
+384 val_384
+384 val_384
+386 val_386
+389 val_389
+392 val_392
+393 val_393
+394 val_394
+395 val_395
+395 val_395
+396 val_396
+396 val_396
+396 val_396
+397 val_397
+397 val_397
+399 val_399
+399 val_399
+400 val_400
+401 val_401
+401 val_401
+401 val_401
+401 val_401
+401 val_401
+402 val_402
+403 val_403
+403 val_403
+403 val_403
+404 val_404
+404 val_404
+406 val_406
+406 val_406
+406 val_406
+406 val_406
+407 val_407
+409 val_409
+409 val_409
+409 val_409
+411 val_411
+413 val_413
+413 val_413
+414 val_414
+414 val_414
+417 val_417
+417 val_417
+417 val_417
+418 val_418
+419 val_419
+421 val_421
+424 val_424
+424 val_424
+427 val_427
+429 val_429
+429 val_429
+430 val_430
+430 val_430
+430 val_430
+431 val_431
+431 val_431
+431 val_431
+432 val_432
+435 val_435
+436 val_436
+437 val_437
+438 val_438
+438 val_438
+438 val_438
+439 val_439
+439 val_439
+443 val_443
+444 val_444
+446 val_446
+448 val_448
+449 val_449
+452 val_452
+453 val_453
+454 val_454
+454 val_454
+454 val_454
+455 val_455
+457 val_457
+458 val_458
+458 val_458
+459 val_459
+459 val_459
+460 val_460
+462 val_462
+462 val_462
+463 val_463
+463 val_463
+466 val_466
+466 val_466
+466 val_466
+467 val_467
+468 val_468
+468 val_468
+468 val_468
+468 val_468
+469 val_469
+469 val_469
+469 val_469
+469 val_469
+469 val_469
+470 val_470
+472 val_472
+475 val_475
+477 val_477
+478 val_478
+478 val_478
+479 val_479
+480 val_480
+480 val_480
+480 val_480
+481 val_481
+482 val_482
+483 val_483
+484 val_484
+485 val_485
+487 val_487
+489 val_489
+489 val_489
+489 val_489
+489 val_489
+490 val_490
+491 val_491
+492 val_492
+492 val_492
+493 val_493
+494 val_494
+495 val_495
+496 val_496
+497 val_497
+498 val_498
+498 val_498
+498 val_498
+PREHOOK: query: drop table bucket1_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket1_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@bucket1_1