You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sd...@apache.org on 2011/06/28 01:40:53 UTC

svn commit: r1140377 [1/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/or...

Author: sdong
Date: Mon Jun 27 23:40:52 2011
New Revision: 1140377

URL: http://svn.apache.org/viewvc?rev=1140377&view=rev
Log:
HIVE-2035. Use block-level merge for RCFile if merging intermediate results are needed (FranklinHu via Siying Dong)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q
    hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q
    hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q
    hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q
    hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q
    hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge4.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jun 27 23:40:52 2011
@@ -319,6 +319,11 @@ public class HiveConf extends Configurat
     HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false),
     HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000)),
     HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000)),
+    HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true),
+    HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
+        "org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat"),
+    HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
+        "hive.merge.current.job.has.dynamic.partitions", false),
 
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
     HIVECONVERTJOIN("hive.auto.convert.join", false),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Jun 27 23:40:52 2011
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -74,6 +76,8 @@ public final class TaskFactory {
         MapredLocalTask.class));
     taskvec.add(new taskTuple<StatsWork>(StatsWork.class,
         StatsTask.class));
+    taskvec.add(new taskTuple<MergeWork>(MergeWork.class,
+        BlockMergeTask.class));
 
 
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Mon Jun 27 23:40:52 2011
@@ -49,11 +49,11 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.LogManager;
@@ -74,7 +74,7 @@ public class BlockMergeTask extends Task
     job = new JobConf(conf, BlockMergeTask.class);
     jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
   }
-  
+
   public boolean requireLock() {
     return true;
   }
@@ -91,7 +91,7 @@ public class BlockMergeTask extends Task
     success = true;
     ShimLoader.getHadoopShims().setNullOutputFormat(job);
     job.setMapperClass(work.getMapperClass());
-    
+
     Context ctx = driverContext.getCtx();
     boolean ctxCreated = false;
     try {
@@ -109,9 +109,9 @@ public class BlockMergeTask extends Task
     job.setMapOutputKeyClass(NullWritable.class);
     job.setMapOutputValueClass(NullWritable.class);
     if(work.getNumMapTasks() != null) {
-      job.setNumMapTasks(work.getNumMapTasks());      
+      job.setNumMapTasks(work.getNumMapTasks());
     }
-    
+
     // zero reducers
     job.setNumReduceTasks(0);
 
@@ -145,18 +145,22 @@ public class BlockMergeTask extends Task
     job.setOutputKeyClass(NullWritable.class);
     job.setOutputValueClass(NullWritable.class);
 
+    HiveConf.setBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
+        work.hasDynamicPartitions());
+
     int returnVal = 0;
     RunningJob rj = null;
     boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
         HiveConf.ConfVars.HADOOPJOBNAME));
-    
+
     String jobName = null;
     if (noName && this.getQueryPlan() != null) {
       int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
       jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(),
           maxlen - 6);
     }
-    
+
     if (noName) {
       // This is for a special case to ensure unit tests pass
       HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME,
@@ -165,7 +169,7 @@ public class BlockMergeTask extends Task
 
     try {
       addInputPaths(job, work);
-      
+
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
       // remove the pwd from conf file so that job tracker doesn't show this
@@ -265,7 +269,7 @@ public class BlockMergeTask extends Task
     if (paths == null || paths.length == 0) {
       printUsage();
     }
-    
+
     FileSystem fs = null;
     JobConf conf = new JobConf(BlockMergeTask.class);
     HiveConf hiveConf = new HiveConf(conf, BlockMergeTask.class);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Mon Jun 27 23:40:52 2011
@@ -19,34 +19,50 @@
 package org.apache.hadoop.hive.ql.io.rcfile.merge;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.Mapper;
 
+@Explain(displayName = "Block level merge")
 public class MergeWork extends MapredWork implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
   private List<String> inputPaths;
   private String outputDir;
+  private boolean hasDynamicPartitions;
 
   public MergeWork() {
   }
-  
+
   public MergeWork(List<String> inputPaths, String outputDir) {
+    this(inputPaths, outputDir, false);
+  }
+
+  public MergeWork(List<String> inputPaths, String outputDir,
+      boolean hasDynamicPartitions) {
     super();
     this.inputPaths = inputPaths;
     this.outputDir = outputDir;
+    this.hasDynamicPartitions = hasDynamicPartitions;
     PartitionDesc partDesc = new PartitionDesc();
     partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
     if(this.getPathToPartitionInfo() == null) {
       this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
     }
+    if(this.getNumReduceTasks() == null) {
+      this.setNumReduceTasks(0);
+    }
     for(String path: this.inputPaths) {
       this.getPathToPartitionInfo().put(path, partDesc);
     }
@@ -84,4 +100,30 @@ public class MergeWork extends MapredWor
     return false;
   }
 
+  public boolean hasDynamicPartitions() {
+    return this.hasDynamicPartitions;
+  }
+
+  public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+    this.hasDynamicPartitions = hasDynamicPartitions;
+  }
+
+  @Override
+  public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+      TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+
+    String inputFormatClass = conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+    try {
+      partDesc.setInputFileFormatClass((Class <? extends InputFormat>)
+          Class.forName(inputFormatClass));
+    } catch (ClassNotFoundException e) {
+      String msg = "Merge input format class not found";
+      throw new RuntimeException(msg);
+    }
+    super.resolveDynamicPartitionMerge(conf, path, tblDesc, aliases, partDesc);
+
+    // Add the DP path to the list of input paths
+    inputPaths.add(path.toString());
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java Mon Jun 27 23:40:52 2011
@@ -36,11 +36,12 @@ public class RCFileBlockMergeRecordReade
   private final long start;
   private final long end;
   private boolean more = true;
+  private final Path path;
   protected Configuration conf;
 
   public RCFileBlockMergeRecordReader(Configuration conf, FileSplit split)
       throws IOException {
-    Path path = split.getPath();
+    path = split.getPath();
     FileSystem fs = path.getFileSystem(conf);
     this.in = new RCFile.Reader(fs, path, conf);
     this.end = split.getStart() + split.getLength();
@@ -87,12 +88,13 @@ public class RCFileBlockMergeRecordReade
     if (!more) {
       return false;
     }
-    
+
     keyWrapper.keyBuffer = this.in.getCurrentKeyBufferObj();
     keyWrapper.recordLength = this.in.getCurrentBlockLength();
     keyWrapper.keyLength = this.in.getCurrentKeyLength();
     keyWrapper.compressedKeyLength = this.in.getCurrentCompressedKeyLen();
     keyWrapper.codec = this.in.getCompressionCodec();
+    keyWrapper.inputPath = path;
 
     valueWrapper.valueBuffer = this.in.getCurrentValueBufferObj();
 
@@ -106,7 +108,7 @@ public class RCFileBlockMergeRecordReade
 
   /**
    * Return the progress within the input split.
-   * 
+   *
    * @return 0.0 to 1.0 of the input byte range
    */
   public float getProgress() throws IOException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Mon Jun 27 23:40:52 2011
@@ -22,6 +22,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -33,7 +34,8 @@ public class RCFileKeyBufferWrapper impl
   protected int recordLength;
   protected int keyLength;
   protected int compressedKeyLength;
-  
+  protected Path inputPath;
+
   protected CompressionCodec codec;
 
   protected RCFileKeyBufferWrapper() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Mon Jun 27 23:40:52 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -57,6 +58,11 @@ public class RCFileMergeMapper extends M
   CompressionCodec codec = null;
   int columnNumber = 0;
 
+  boolean hasDynamicPartitions = false;
+  boolean tmpPathFixed = false;
+  Path tmpPath;
+  Path dpPath;
+
   public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
 
   public RCFileMergeMapper() {
@@ -64,12 +70,12 @@ public class RCFileMergeMapper extends M
 
   public void configure(JobConf job) {
     jc = job;
+    hasDynamicPartitions = HiveConf.getBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+
     String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
         .toString();
-    Path tmpPath = Utilities.toTempPath(specPath);
-    String taskId = Utilities.getTaskId(job);
-    finalPath = new Path(tmpPath, taskId);
-    outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+    updatePaths(Utilities.toTempPath(specPath));
     try {
       fs = (new Path(specPath)).getFileSystem(job);
       autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
@@ -80,6 +86,13 @@ public class RCFileMergeMapper extends M
     }
   }
 
+  private void updatePaths(Path tmpPath) {
+    String taskId = Utilities.getTaskId(jc);
+    this.tmpPath = tmpPath;
+    finalPath = new Path(tmpPath, taskId);
+    outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+  }
+
   @Override
   public void map(Object k, RCFileValueBufferWrapper value,
       OutputCollector<Object, Object> output, Reporter reporter)
@@ -93,6 +106,16 @@ public class RCFileMergeMapper extends M
         key = (RCFileKeyBufferWrapper) k;
       }
 
+      if (hasDynamicPartitions) {
+        if (tmpPathFixed) {
+          checkPartitionsMatch(key.inputPath.getParent());
+        } else {
+          // We haven't fixed the TMP path for this mapper yet
+          fixTmpPath(key.inputPath.getParent());
+          tmpPathFixed = true;
+        }
+      }
+
       if (outWriter == null) {
         codec = key.codec;
         columnNumber = key.keyBuffer.getColumnNumber();
@@ -117,6 +140,46 @@ public class RCFileMergeMapper extends M
     }
   }
 
+  /**
+   * Validates that each input path belongs to the same partition
+   * since each mapper merges the input to a single output directory
+   *
+   * @param inputPath
+   * @throws HiveException
+   */
+  private void checkPartitionsMatch(Path inputPath) throws HiveException {
+    if (!dpPath.equals(inputPath)) {
+      // Temp partition input path does not match exist temp path
+      String msg = "Multiple partitions for one block merge mapper: " +
+          dpPath + " NOT EQUAL TO " + inputPath;
+      LOG.error(msg);
+      throw new HiveException(msg);
+    }
+  }
+
+  /**
+   * Fixes tmpPath to point to the correct partition.
+   * Before this is called, tmpPath will default to the root tmp table dir
+   *
+   * @param inputPath
+   * @throws HiveException
+   */
+  private void fixTmpPath(Path inputPath)
+      throws HiveException {
+    dpPath = inputPath;
+    Path newPath = new Path(".");
+    int inputDepth = inputPath.depth();
+    int tmpDepth = tmpPath.depth();
+
+    // Build the path from bottom up
+    while (inputPath != null && inputPath.depth() > tmpDepth) {
+      newPath = new Path(inputPath.getName(), newPath);
+      inputDepth--;
+      inputPath = inputPath.getParent();
+    }
+    updatePaths(new Path(tmpPath, newPath));
+  }
+
   public void close() throws IOException {
     // close writer
     if (outWriter == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Jun 27 23:40:52 2011
@@ -44,6 +44,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -71,8 +74,8 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
 
 /**
  * Processor for the rule - table scan followed by reduce sink.
@@ -313,7 +316,7 @@ public class GenMRFileSink1 implements N
    * directories.
    *
    */
-  private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) {
+  private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) throws SemanticException {
 
     //
     // 1. create the operator tree
@@ -374,8 +377,31 @@ public class GenMRFileSink1 implements N
     MapRedTask currTask = (MapRedTask) ctx.getCurrTask();
     MoveWork dummyMv = new MoveWork(null, null, null,
         new LoadFileDesc(fsInputDesc.getDirName(), finalName, true, null, null), false);
-    MapredWork cplan = createMergeTask(ctx.getConf(), tsMerge, fsInputDesc);
-    // use CombineHiveInputFormat for map-only merging
+    MapredWork cplan;
+
+    if(parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
+        HIVEMERGERCFILEBLOCKLEVEL) &&
+        fsInputDesc.getTableInfo().getInputFileFormatClass().
+        equals(RCFileInputFormat.class)) {
+
+      // Check if InputFormatClass is valid
+      String inputFormatClass = parseCtx.getConf().
+          getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+      try {
+        Class c = (Class <? extends InputFormat>) Class.forName(inputFormatClass);
+
+        LOG.info("RCFile format- Using block level merge");
+        cplan = createRCFileMergeTask(fsInputDesc, finalName,
+            dpCtx != null && dpCtx.getNumDPCols() > 0);
+      } catch (ClassNotFoundException e) {
+        String msg = "Illegal input format class: " + inputFormatClass;
+        throw new SemanticException(msg);
+      }
+
+    } else {
+      cplan = createMergeTask(ctx.getConf(), tsMerge, fsInputDesc);
+      // use CombineHiveInputFormat for map-only merging
+    }
     cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
     // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
     // know if merge MR2 will be triggered at execution time
@@ -434,6 +460,46 @@ public class GenMRFileSink1 implements N
 
     return cplan;
   }
+
+  /**
+   * Create a block level merge task for RCFiles.
+   * @param fsInputDesc
+   * @param finalName
+   * @return MergeWork if table is stored as RCFile,
+   *         null otherwise
+   */
+  private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+      String finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+    String inputDir = fsInputDesc.getDirName();
+    TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+    if(tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+      ArrayList<String> inputDirs = new ArrayList<String>();
+      if (!hasDynamicPartitions) {
+        inputDirs.add(inputDir);
+      }
+
+      MergeWork work = new MergeWork(inputDirs, finalName,
+          hasDynamicPartitions);
+      LinkedHashMap<String, ArrayList<String>> pathToAliases =
+          new LinkedHashMap<String, ArrayList<String>>();
+      pathToAliases.put(inputDir, (ArrayList<String>) inputDirs.clone());
+      work.setMapperCannotSpanPartns(true);
+      work.setPathToAliases(pathToAliases);
+      work.setAliasToWork(
+          new LinkedHashMap<String, Operator<? extends Serializable>>());
+      if (hasDynamicPartitions) {
+        work.getPathToPartitionInfo().put(inputDir,
+            new PartitionDesc(tblDesc, null));
+      }
+
+      return work;
+    }
+
+    throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+  }
+
   /**
    * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
    * @param conf HiveConf

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Mon Jun 27 23:40:52 2011
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -163,13 +165,13 @@ public class ConditionalResolverMergeFil
             if (len >= 0) {
               doMerge = true;
               totalSz += len;
-              work.getPathToAliases().put(status[i].getPath().toString(), aliases);
-              // get the full partition spec from the path and update the PartitionDesc
               Map<String, String> fullPartSpec = new LinkedHashMap<String, String>(
                   dpCtx.getPartSpec());
               Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath());
               PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec);
-              work.getPathToPartitionInfo().put(status[i].getPath().toString(), pDesc);
+
+              work.resolveDynamicPartitionMerge(conf, status[i].getPath(), tblDesc,
+                  aliases, pDesc);
             } else {
               toMove.add(status[i].getPath().toString());
             }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1140377&r1=1140376&r2=1140377&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Mon Jun 27 23:40:52 2011
@@ -26,6 +26,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -431,4 +433,10 @@ public class MapredWork implements Seria
     this.opParseCtxMap = opParseCtxMap;
   }
 
+  public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+      TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
+    pathToAliases.put(path.toString(), aliases);
+    pathToPartitionInfo.put(path.toString(), partDesc);
+  }
+
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_createas1.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,35 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_createas1a;
+DROP TABLE rcfile_createas1b;
+
+CREATE TABLE rcfile_createas1a (key INT, value STRING)
+    PARTITIONED BY (ds string);
+INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+    SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+    SELECT * FROM src;
+
+EXPLAIN
+    CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a;
+CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a;
+
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1a
+) t;
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1b
+) t;
+
+DROP TABLE rcfile_createas1a;
+DROP TABLE rcfile_createas1b;

Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge1.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,43 @@
+set hive.merge.rcfile.block.level=false;
+set hive.exec.dynamic.partition=true;
+set mapred.max.split.size=100;
+set mapref.min.split.size=1;
+
+DROP TABLE rcfile_merge1;
+DROP TABLE rcfile_merge1b;
+
+CREATE TABLE rcfile_merge1 (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS RCFILE;
+CREATE TABLE rcfile_merge1b (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS RCFILE;
+
+-- Use non block-level merge
+EXPLAIN
+    INSERT OVERWRITE TABLE rcfile_merge1 PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 100) as part
+        FROM src;
+INSERT OVERWRITE TABLE rcfile_merge1 PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 100) as part
+    FROM src;
+
+set hive.merge.rcfile.block.level=true;
+EXPLAIN
+    INSERT OVERWRITE TABLE rcfile_merge1b PARTITION (ds='1', part)
+        SELECT key, value, PMOD(HASH(key), 100) as part
+        FROM src;
+INSERT OVERWRITE TABLE rcfile_merge1b PARTITION (ds='1', part)
+    SELECT key, value, PMOD(HASH(key), 100) as part
+    FROM src;
+
+-- Verify
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM rcfile_merge1 WHERE ds='1'
+) t;
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM rcfile_merge1b WHERE ds='1'
+) t;
+
+DROP TABLE rcfile_merge1;
+DROP TABLE rcfile_merge1b;

Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge2.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set hive.exec.dynamic.partition=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge2a;
+
+CREATE TABLE rcfile_merge2a (key INT, value STRING)
+    PARTITIONED BY (one string, two string, three string)
+    STORED AS RCFILE;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge2a PARTITION (one='1', two, three)
+    SELECT key, value, PMOD(HASH(key), 10) as two, 
+        PMOD(HASH(value), 10) as three
+    FROM src;
+INSERT OVERWRITE TABLE rcfile_merge2a PARTITION (one='1', two, three)
+    SELECT key, value, PMOD(HASH(key), 10) as two, 
+        PMOD(HASH(value), 10) as three
+    FROM src;
+
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+    FROM rcfile_merge2a
+) t;
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value, '1', PMOD(HASH(key), 10), 
+        PMOD(HASH(value), 10)) USING 'tr \t _' AS (c)
+    FROM src
+) t;
+
+DROP TABLE rcfile_merge2a;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge3.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
+
+CREATE TABLE rcfile_merge3a (key int, value string) 
+    PARTITIONED BY (ds string) STORED AS TEXTFILE;
+CREATE TABLE rcfile_merge3b (key int, value string) STORED AS RCFILE;
+
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='1')
+    SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='2')
+    SELECT * FROM src;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge3b
+    SELECT key, value FROM rcfile_merge3a;
+INSERT OVERWRITE TABLE rcfile_merge3b
+    SELECT key, value FROM rcfile_merge3a;
+
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_merge3a
+) t;
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_merge3b
+) t;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;

Added: hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/rcfile_merge4.q Mon Jun 27 23:40:52 2011
@@ -0,0 +1,32 @@
+set hive.merge.rcfile.block.level=true;
+set mapred.max.split.size=100;
+set mapred.min.split.size=1;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;
+
+CREATE TABLE rcfile_merge3a (key int, value string) 
+    PARTITIONED BY (ds string) STORED AS RCFILE;
+CREATE TABLE rcfile_merge3b (key int, value string) STORED AS TEXTFILE;
+
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='1')
+    SELECT * FROM src;
+INSERT OVERWRITE TABLE rcfile_merge3a PARTITION (ds='2')
+    SELECT * FROM src;
+
+EXPLAIN INSERT OVERWRITE TABLE rcfile_merge3b
+    SELECT key, value FROM rcfile_merge3a;
+INSERT OVERWRITE TABLE rcfile_merge3b
+    SELECT key, value FROM rcfile_merge3a;
+
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_merge3a
+) t;
+SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_merge3b
+) t;
+
+DROP TABLE rcfile_merge3a;
+DROP TABLE rcfile_merge3b;

Added: hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out?rev=1140377&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out Mon Jun 27 23:40:52 2011
@@ -0,0 +1,205 @@
+PREHOOK: query: DROP TABLE rcfile_createas1a
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfile_createas1a
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE rcfile_createas1b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfile_createas1b
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE rcfile_createas1a (key INT, value STRING)
+    PARTITIONED BY (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE rcfile_createas1a (key INT, value STRING)
+    PARTITIONED BY (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@rcfile_createas1a
+PREHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+    SELECT * FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfile_createas1a@ds=1
+POSTHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='1')
+    SELECT * FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfile_createas1a@ds=1
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+    SELECT * FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfile_createas1a@ds=2
+POSTHOOK: query: INSERT OVERWRITE TABLE rcfile_createas1a PARTITION (ds='2')
+    SELECT * FROM src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfile_createas1a@ds=2
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN
+    CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: EXPLAIN
+    CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+  (TOK_CREATETABLE (TOK_TABNAME rcfile_createas1b) TOK_LIKETABLE TOK_TBLRCFILE (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME rcfile_createas1a))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_FUNCTION PMOD (TOK_FUNCTION HASH (TOK_TABLE_OR_COL key)) 50) part)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-4 depends on stages: Stage-1 , consists of Stage-3, Stage-2
+  Stage-3
+  Stage-0 depends on stages: Stage-3, Stage-2
+  Stage-5 depends on stages: Stage-0
+  Stage-2
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        rcfile_createas1a 
+          TableScan
+            alias: rcfile_createas1a
+            Select Operator
+              expressions:
+                    expr: key
+                    type: int
+                    expr: value
+                    type: string
+                    expr: pmod(hash(key), 50)
+                    type: int
+              outputColumnNames: _col0, _col1, _col2
+              File Output Operator
+                compressed: false
+                GlobalTableId: 1
+                table:
+                    input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
+
+  Stage: Stage-4
+    Conditional Operator
+
+  Stage: Stage-3
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: pfile:/data/users/franklin/hive-block-merge/build/ql/scratchdir/hive_2011-06-09_16-06-50_525_4856448737963146161/-ext-10001
+
+  Stage: Stage-0
+    Move Operator
+      files:
+          hdfs directory: true
+          destination: pfile:/data/users/franklin/hive-block-merge/build/ql/test/data/warehouse/rcfile_createas1b
+
+  Stage: Stage-5
+      Create Table Operator:
+        Create Table
+          columns: key int, value string, part int
+          if not exists: false
+          input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+          # buckets: -1
+          output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
+          serde name: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
+          name: rcfile_createas1b
+          isExternal: false
+
+  Stage: Stage-2
+    Block level merge
+
+
+PREHOOK: query: CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@rcfile_createas1a@ds=1
+PREHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: query: CREATE TABLE rcfile_createas1b
+    STORED AS RCFILE AS 
+        SELECT key, value, PMOD(HASH(key), 50) as part
+        FROM rcfile_createas1a
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@rcfile_createas1a@ds=1
+POSTHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: Output: default@rcfile_createas1b
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1a
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfile_createas1a@ds=1
+PREHOOK: Input: default@rcfile_createas1a@ds=2
+PREHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-54_053_5965587433920310393/-mr-10000
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1a
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfile_createas1a@ds=1
+POSTHOOK: Input: default@rcfile_createas1a@ds=2
+POSTHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-54_053_5965587433920310393/-mr-10000
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+14412220296
+PREHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1b
+) t
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfile_createas1b
+PREHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-57_460_3734087433150140544/-mr-10000
+POSTHOOK: query: SELECT SUM(HASH(c)) FROM (
+    SELECT TRANSFORM(key, value) USING 'tr \t _' AS (c)
+    FROM rcfile_createas1b
+) t
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfile_createas1b
+POSTHOOK: Output: file:/tmp/franklin/hive_2011-06-09_16-06-57_460_3734087433150140544/-mr-10000
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+14412220296
+PREHOOK: query: DROP TABLE rcfile_createas1a
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@rcfile_createas1a
+PREHOOK: Output: default@rcfile_createas1a
+POSTHOOK: query: DROP TABLE rcfile_createas1a
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@rcfile_createas1a
+POSTHOOK: Output: default@rcfile_createas1a
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: DROP TABLE rcfile_createas1b
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@rcfile_createas1b
+PREHOOK: Output: default@rcfile_createas1b
+POSTHOOK: query: DROP TABLE rcfile_createas1b
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@rcfile_createas1b
+POSTHOOK: Output: default@rcfile_createas1b
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=1).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: rcfile_createas1a PARTITION(ds=2).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]