You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sd...@apache.org on 2011/06/28 01:40:53 UTC
svn commit: r1140377 [1/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/or...
Author: sdong
Date: Mon Jun 27 23:40:52 2011
New Revision: 1140377
URL: http://svn.apache.org/viewvc?rev=1140377&view=rev
Log:
HIVE-2035. Use block-level merge for RCFile if merging intermediate results are needed (FranklinHu via Siying Dong)
Added:
hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q
hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q
hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q
hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q
hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q
hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge4.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jun 27 23:40:52 2011
@@ -319,6 +319,11 @@ public class HiveConf extends Configurat
HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000)),
HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000)),
+ HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true),
+ HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
+ "org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat"),
+ HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
+ "hive.merge.current.job.has.dynamic.partitions", false),
HIVESKEWJOIN("hive.optimize.skewjoin", false),
HIVECONVERTJOIN("hive.auto.convert.join", false),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Jun 27 23:40:52 2011
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -74,6 +76,8 @@ public final class TaskFactory {
MapredLocalTask.class));
taskvec.add(new taskTuple<StatsWork>(StatsWork.class,
StatsTask.class));
+ taskvec.add(new taskTuple<MergeWork>(MergeWork.class,
+ BlockMergeTask.class));
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Jun 27 23:40:52 2011
@@ -49,11 +49,11 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.LogManager;
@@ -74,7 +74,7 @@ public class BlockMergeTask extends Task
job = new JobConf(conf, BlockMergeTask.class);
jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
}
-
+
public boolean requireLock() {
return true;
}
@@ -91,7 +91,7 @@ public class BlockMergeTask extends Task
success = true;
ShimLoader.getHadoopShims().setNullOutputFormat(job);
job.setMapperClass(work.getMapperClass());
-
+
Context ctx = driverContext.getCtx();
boolean ctxCreated = false;
try {
@@ -109,9 +109,9 @@ public class BlockMergeTask extends Task
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
if(work.getNumMapTasks() != null) {
- job.setNumMapTasks(work.getNumMapTasks());
+ job.setNumMapTasks(work.getNumMapTasks());
}
-
+
// zero reducers
job.setNumReduceTasks(0);
@@ -145,18 +145,22 @@ public class BlockMergeTask extends Task
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
+ HiveConf.setBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
+ work.hasDynamicPartitions());
+
int returnVal = 0;
RunningJob rj = null;
boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
HiveConf.ConfVars.HADOOPJOBNAME));
-
+
String jobName = null;
if (noName && this.getQueryPlan() != null) {
int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(),
maxlen - 6);
}
-
+
if (noName) {
// This is for a special case to ensure unit tests pass
HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME,
@@ -165,7 +169,7 @@ public class BlockMergeTask extends Task
try {
addInputPaths(job, work);
-
+
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
@@ -265,7 +269,7 @@ public class BlockMergeTask extends Task
if (paths == null || paths.length == 0) {
printUsage();
}
-
+
FileSystem fs = null;
JobConf conf = new JobConf(BlockMergeTask.class);
HiveConf hiveConf = new HiveConf(conf, BlockMergeTask.class);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Mon Jun 27 23:40:52 2011
@@ -19,34 +19,50 @@
package org.apache.hadoop.hive.ql.io.rcfile.merge;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.Mapper;
+@Explain(displayName = "Block level merge")
public class MergeWork extends MapredWork implements Serializable {
private static final long serialVersionUID = 1L;
private List<String> inputPaths;
private String outputDir;
+ private boolean hasDynamicPartitions;
public MergeWork() {
}
-
+
public MergeWork(List<String> inputPaths, String outputDir) {
+ this(inputPaths, outputDir, false);
+ }
+
+ public MergeWork(List<String> inputPaths, String outputDir,
+ boolean hasDynamicPartitions) {
super();
this.inputPaths = inputPaths;
this.outputDir = outputDir;
+ this.hasDynamicPartitions = hasDynamicPartitions;
PartitionDesc partDesc = new PartitionDesc();
partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
if(this.getPathToPartitionInfo() == null) {
this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
}
+ if(this.getNumReduceTasks() == null) {
+ this.setNumReduceTasks(0);
+ }
for(String path: this.inputPaths) {
this.getPathToPartitionInfo().put(path, partDesc);
}
@@ -84,4 +100,30 @@ public class MergeWork extends MapredWor
return false;
}
+ public boolean hasDynamicPartitions() {
+ return this.hasDynamicPartitions;
+ }
+
+ public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ }
+
+ @Override
+ public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+ TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+
+ String inputFormatClass = conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ try {
+ partDesc.setInputFileFormatClass((Class <? extends InputFormat>)
+ Class.forName(inputFormatClass));
+ } catch (ClassNotFoundException e) {
+ String msg = "Merge input format class not found";
+ throw new RuntimeException(msg);
+ }
+ super.resolveDynamicPartitionMerge(conf, path, tblDesc, aliases, partDesc);
+
+ // Add the DP path to the list of input paths
+ inputPaths.add(path.toString());
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java Mon Jun 27 23:40:52 2011
@@ -36,11 +36,12 @@ public class RCFileBlockMergeRecordReade
private final long start;
private final long end;
private boolean more = true;
+ private final Path path;
protected Configuration conf;
public RCFileBlockMergeRecordReader(Configuration conf, FileSplit split)
throws IOException {
- Path path = split.getPath();
+ path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new RCFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
@@ -87,12 +88,13 @@ public class RCFileBlockMergeRecordReade
if (!more) {
return false;
}
-
+
keyWrapper.keyBuffer = this.in.getCurrentKeyBufferObj();
keyWrapper.recordLength = this.in.getCurrentBlockLength();
keyWrapper.keyLength = this.in.getCurrentKeyLength();
keyWrapper.compressedKeyLength = this.in.getCurrentCompressedKeyLen();
keyWrapper.codec = this.in.getCompressionCodec();
+ keyWrapper.inputPath = path;
valueWrapper.valueBuffer = this.in.getCurrentValueBufferObj();
@@ -106,7 +108,7 @@ public class RCFileBlockMergeRecordReade
/**
* Return the progress within the input split.
- *
+ *
* @return 0.0 to 1.0 of the input byte range
*/
public float getProgress() throws IOException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Mon Jun 27 23:40:52 2011
@@ -22,6 +22,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -33,7 +34,8 @@ public class RCFileKeyBufferWrapper impl
protected int recordLength;
protected int keyLength;
protected int compressedKeyLength;
-
+ protected Path inputPath;
+
protected CompressionCodec codec;
protected RCFileKeyBufferWrapper() {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Mon Jun 27 23:40:52 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -57,6 +58,11 @@ public class RCFileMergeMapper extends M
CompressionCodec codec = null;
int columnNumber = 0;
+ boolean hasDynamicPartitions = false;
+ boolean tmpPathFixed = false;
+ Path tmpPath;
+ Path dpPath;
+
public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
public RCFileMergeMapper() {
@@ -64,12 +70,12 @@ public class RCFileMergeMapper extends M
public void configure(JobConf job) {
jc = job;
+ hasDynamicPartitions = HiveConf.getBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+
String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
.toString();
- Path tmpPath = Utilities.toTempPath(specPath);
- String taskId = Utilities.getTaskId(job);
- finalPath = new Path(tmpPath, taskId);
- outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ updatePaths(Utilities.toTempPath(specPath));
try {
fs = (new Path(specPath)).getFileSystem(job);
autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
@@ -80,6 +86,13 @@ public class RCFileMergeMapper extends M
}
}
+ private void updatePaths(Path tmpPath) {
+ String taskId = Utilities.getTaskId(jc);
+ this.tmpPath = tmpPath;
+ finalPath = new Path(tmpPath, taskId);
+ outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ }
+
@Override
public void map(Object k, RCFileValueBufferWrapper value,
OutputCollector<Object, Object> output, Reporter reporter)
@@ -93,6 +106,16 @@ public class RCFileMergeMapper extends M
key = (RCFileKeyBufferWrapper) k;
}
+ if (hasDynamicPartitions) {
+ if (tmpPathFixed) {
+ checkPartitionsMatch(key.inputPath.getParent());
+ } else {
+ // We haven't fixed the TMP path for this mapper yet
+ fixTmpPath(key.inputPath.getParent());
+ tmpPathFixed = true;
+ }
+ }
+
if (outWriter == null) {
codec = key.codec;
columnNumber = key.keyBuffer.getColumnNumber();
@@ -117,6 +140,46 @@ public class RCFileMergeMapper extends M
}
}
+ /**
+ * Validates that each input path belongs to the same partition
+ * since each mapper merges the input to a single output directory
+ *
+ * @param inputPath
+ * @throws HiveException
+ */
+ private void checkPartitionsMatch(Path inputPath) throws HiveException {
+ if (!dpPath.equals(inputPath)) {
+ // Temp partition input path does not match exist temp path
+ String msg = "Multiple partitions for one block merge mapper: " +
+ dpPath + " NOT EQUAL TO " + inputPath;
+ LOG.error(msg);
+ throw new HiveException(msg);
+ }
+ }
+
+ /**
+ * Fixes tmpPath to point to the correct partition.
+ * Before this is called, tmpPath will default to the root tmp table dir
+ *
+ * @param inputPath
+ * @throws HiveException
+ */
+ private void fixTmpPath(Path inputPath)
+ throws HiveException {
+ dpPath = inputPath;
+ Path newPath = new Path(".");
+ int inputDepth = inputPath.depth();
+ int tmpDepth = tmpPath.depth();
+
+ // Build the path from bottom up
+ while (inputPath != null && inputPath.depth() > tmpDepth) {
+ newPath = new Path(inputPath.getName(), newPath);
+ inputDepth--;
+ inputPath = inputPath.getParent();
+ }
+ updatePaths(new Path(tmpPath, newPath));
+ }
+
public void close() throws IOException {
// close writer
if (outWriter == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Jun 27 23:40:52 2011
@@ -44,6 +44,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -71,8 +74,8 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
/**
* Processor for the rule - table scan followed by reduce sink.
@@ -313,7 +316,7 @@ public class GenMRFileSink1 implements N
* directories.
*
*/
- private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) {
+ private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) throws SemanticException {
//
// 1. create the operator tree
@@ -374,8 +377,31 @@ public class GenMRFileSink1 implements N
MapRedTask currTask = (MapRedTask) ctx.getCurrTask();
MoveWork dummyMv = new MoveWork(null, null, null,
new LoadFileDesc(fsInputDesc.getDirName(), finalName, true, null, null), false);
- MapredWork cplan = createMergeTask(ctx.getConf(), tsMerge, fsInputDesc);
- // use CombineHiveInputFormat for map-only merging
+ MapredWork cplan;
+
+ if(parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
+ HIVEMERGERCFILEBLOCKLEVEL) &&
+ fsInputDesc.getTableInfo().getInputFileFormatClass().
+ equals(RCFileInputFormat.class)) {
+
+ // Check if InputFormatClass is valid
+ String inputFormatClass = parseCtx.getConf().
+ getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ try {
+ Class c = (Class <? extends InputFormat>) Class.forName(inputFormatClass);
+
+ LOG.info("RCFile format- Using block level merge");
+ cplan = createRCFileMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ } catch (ClassNotFoundException e) {
+ String msg = "Illegal input format class: " + inputFormatClass;
+ throw new SemanticException(msg);
+ }
+
+ } else {
+ cplan = createMergeTask(ctx.getConf(), tsMerge, fsInputDesc);
+ // use CombineHiveInputFormat for map-only merging
+ }
cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
// NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
// know if merge MR2 will be triggered at execution time
@@ -434,6 +460,46 @@ public class GenMRFileSink1 implements N
return cplan;
}
+
+ /**
+ * Create a block level merge task for RCFiles.
+ * @param fsInputDesc
+ * @param finalName
+ * @return MergeWork if table is stored as RCFile,
+ * null otherwise
+ */
+ private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+ String finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+ String inputDir = fsInputDesc.getDirName();
+ TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+ if(tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ ArrayList<String> inputDirs = new ArrayList<String>();
+ if (!hasDynamicPartitions) {
+ inputDirs.add(inputDir);
+ }
+
+ MergeWork work = new MergeWork(inputDirs, finalName,
+ hasDynamicPartitions);
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
+ new LinkedHashMap<String, ArrayList<String>>();
+ pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ work.setAliasToWork(
+ new LinkedHashMap<String, Operator<? extends Serializable>>());
+ if (hasDynamicPartitions) {
+ work.getPathToPartitionInfo().put(inputDir,
+ new PartitionDesc(tblDesc, null));
+ }
+
+ return work;
+ }
+
+ throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+ }
+
/**
* Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
* @param conf HiveConf
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Mon Jun 27 23:40:52 2011
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -163,13 +165,13 @@ public class ConditionalResolverMergeFil
if (len >= 0) {
doMerge = true;
totalSz += len;
- work.getPathToAliases().put(status[i].getPath().toString(), aliases);
- // get the full partition spec from the path and update the PartitionDesc
Map<String, String> fullPartSpec = new LinkedHashMap<String, String>(
dpCtx.getPartSpec());
Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath());
PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec);
- work.getPathToPartitionInfo().put(status[i].getPath().toString(), pDesc);
+
+ work.resolveDynamicPartitionMerge(conf, status[i].getPath(), tblDesc,
+ aliases, pDesc);
} else {
toMove.add(status[i].getPath().toString());
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Mon Jun 27 23:40:52 2011
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -431,4 +433,10 @@ public class MapredWork implements Seria
this.opParseCtxMap = opParseCtxMap;
}
+ public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+ TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+ pathToAliases.put(path.toString(), aliases);
+ pathToPartitionInfo.put(path.toString(), partDesc);
+ }
+
}
Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,35 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_createas1a;
+DROP TABLE rcfile_createas1b;
+
+CREATE TABLE rcfile_createas1a (key INT, value STRING)
+ PARTITIONED BY (ds string);
+INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+ SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+ SELECT * FROM src;
+
+EXPLAIN
+ CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a;
+CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a;
+
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1a
+) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1b
+) t;
+
+DROP TABLE rcfile_createas1a;
+DROP TABLE rcfile_createas1b;
Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,43 @@
+set hive.merge.rcfile.block.level=false;
+set hive.exec.dynamic.partition=true;
+set mapred.max.split.size=100;
+set mapref.min.split.size=1;
+
+DROP TABLE rcfile_merge1;
+DROP TABLE rcfile_merge1b;
+
+CREATE TABLE rcfile_merge1 (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS RCFILE;
+CREATE TABLE rcfile_merge1b (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS RCFILE;
+
+-- Use non block-level merge
+EXPLAIN
+ INSERT OVERWRITE TABLE rcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 100) as part
+ FROM src;
+INSERT OVERWRITE TABLE rcfile_merge1 PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 100) as part
+ FROM src;
+
+set hive.merge.rcfile.block.level=true;
+EXPLAIN
+ INSERT OVERWRITE TABLE rcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 100) as part
+ FROM src;
+INSERT OVERWRITE TABLE rcfile_merge1b PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 100) as part
+ FROM src;
+
+-- Verify
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM rcfile_merge1 WHERE ds='1'
+) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM rcfile_merge1b WHERE ds='1'
+) t;
+
+DROP TABLE rcfile_merge1;
+DROP TABLE rcfile_merge1b;
Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set hive.exec.dynamic.partition=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge2a;
+
+CREATE TABLE rcfile_merge2a (key INT, value STRING)
+ PARTITIONED BY (one string, two string, three string)
+ STORED AS RCFILE;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge2a PARTITION (one='1', two, three)
+ SELECT key, value, PMOD(HASH(key), 10) as two,
+ PMOD(HASH(value), 10) as three
+ FROM src;
+INSERT OVERWRITE TABLE rcfile_merge2a PARTITION (one='1', two, three)
+ SELECT key, value, PMOD(HASH(key), 10) as two,
+ PMOD(HASH(value), 10) as three
+ FROM src;
+
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM rcfile_merge2a
+) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value, '1', PMOD(HASH(key), 10),
+ PMOD(HASH(value), 10)) USING 'tr \t _' AS (c)
+ FROM src
+) t;
+
+DROP TABLE rcfile_merge2a;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
+
+CREATE TABLE rcfile_merge3a (key int, value string)
+ PARTITIONED BY (ds string) STORED AS TEXTFILE;
+CREATE TABLE rcfile_merge3b (key int, value string) STORED AS RCFILE;
+
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='1')
+ SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='2')
+ SELECT * FROM src;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge3b
+ SELECT key, value FROM rcfile_merge3a;
+INSERT OVERWRITE TABLE rcfile_merge3b
+ SELECT key, value FROM rcfile_merge3a;
+
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_merge3a
+) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_merge3b
+) t;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
+
+CREATE TABLE rcfile_merge3a (key int, value string)
+ PARTITIONED BY (ds string) STORED AS RCFILE;
+CREATE TABLE rcfile_merge3b (key int, value string) STORED AS TEXTFILE;
+
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='1')
+ SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='2')
+ SELECT * FROM src;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge3b
+ SELECT key, value FROM rcfile_merge3a;
+INSERT OVERWRITE TABLE rcfile_merge3b
+ SELECT key, value FROM rcfile_merge3a;
+
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_merge3a
+) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_merge3b
+) t;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
Added: hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out Mon Jun 27 23:40:52 2011
@@ -0,0 +1,205 @@
+PREHOOK: query: DROP TABLE rcfile_createas1a
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfile_createas1a
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE rcfile_createas1b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfile_createas1b
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE rcfile_createas1a (key INT, value STRING)
+ PARTITIONED BY (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE rcfile_createas1a (key INT, value STRING)
+ PARTITIONED BY (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@rcfile_createas1a
+PREHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+ SELECT * FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfile_createas1a@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+ SELECT * FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfile_createas1a@ds=1
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+ SELECT * FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfile_createas1a@ds=2
+POSTHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+ SELECT * FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfile_createas1a@ds=2
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN
+ CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: EXPLAIN
+ CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_CREATETABLE (TOK_TABNAME rcfile_createas1b) TOK_LIKETABLE TOK_TBLRCFILE (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME rcfile_createas1a))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_FUNCTION PMOD (TOK_FUNCTION HASH (TOK_TABLE_OR_COL key)) 50) part)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+ Stage-3
+ Stage-0 depends on stages: Stage-3, Stage-2
+ Stage-5 depends on stages: Stage-0
+ Stage-2
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ rcfile_createas1a
+ TableScan
+ alias: rcfile_createas1a
+ Select Operator
+ expressions:
+ expr: key
+ type: int
+ expr: value
+ type: string
+ expr: pmod(hash(key), 50)
+ type: int
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
+
+ Stage: Stage-4
+ Conditional Operator
+
+ Stage: Stage-3
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: pfile:/data/users/franklin/hive-block-merge/build/ql/scratchdir/hive_2011-06-09_16-06-50_525_4856448737963146161/-ext-10001
+
+ Stage: Stage-0
+ Move Operator
+ files:
+ hdfs directory: true
+ destination: pfile:/data/users/franklin/hive-block-merge/build/ql/test/data/warehouse/rcfile_createas1b
+
+ Stage: Stage-5
+ Create Table Operator:
+ Create Table
+ columns: key int, value string, part int
+ if not exists: false
+ input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+ # buckets: -1
+ output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
+ serde name: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
+ name: rcfile_createas1b
+ isExternal: false
+
+ Stage: Stage-2
+ Block level merge
+
+
+PREHOOK: query: CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@rcfile_createas1a@ds=1
+PREHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: query: CREATE TABLE rcfile_createas1b
+ STORED AS RCFILE AS
+ SELECT key, value, PMOD(HASH(key), 50) as part
+ FROM rcfile_createas1a
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@rcfile_createas1a@ds=1
+POSTHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: Output: default@rcfile_createas1b
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1a
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfile_createas1a@ds=1
+PREHOOK: Input: default@rcfile_createas1a@ds=2
+PREHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-54_053_5965587433920310393/-mr-10000
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1a
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfile_createas1a@ds=1
+POSTHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-54_053_5965587433920310393/-mr-10000
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+14412220296
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1b
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfile_createas1b
+PREHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-57_460_3734087433150140544/-mr-10000
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+ FROM rcfile_createas1b
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfile_createas1b
+POSTHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-57_460_3734087433150140544/-mr-10000
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+14412220296
+PREHOOK: query: DROP TABLE rcfile_createas1a
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@rcfile_createas1a
+PREHOOK: Output: default@rcfile_createas1a
+POSTHOOK: query: DROP TABLE rcfile_createas1a
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@rcfile_createas1a
+POSTHOOK: Output: default@rcfile_createas1a
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: DROP TABLE rcfile_createas1b
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@rcfile_createas1b
+PREHOOK: Output: default@rcfile_createas1b
+POSTHOOK: query: DROP TABLE rcfile_createas1b
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@rcfile_createas1b
+POSTHOOK: Output: default@rcfile_createas1b
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]