You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/13 02:39:28 UTC

svn commit: r1624688 [1/9] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ itests/src/test/resources/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/h...

Author: prasanthj
Date: Sat Sep 13 00:39:26 2014
New Revision: 1624688

URL: http://svn.apache.org/r1624688
Log:
HIVE-7704: Create tez task for fast file merging (Prasanth J, reviewed by Gunther Hagleitner, Vikram Dixit)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q
    hive/trunk/ql/src/test/results/clientpositive/orc_merge5.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge6.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge7.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge5.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge6.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge7.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge_incompat2.q.out
Removed:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q
    hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q
    hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
    hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge1.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_11.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_12.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_14.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_16.q.out
    hive/trunk/ql/src/test/results/clientpositive/union_remove_9.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 13 00:39:26 2014
@@ -803,18 +803,11 @@ public class HiveConf extends Configurat
         "map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +
         "if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),
     HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
-    HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
-        "org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat", ""),
     HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
         "When hive.merge.mapfiles or hive.merge.mapredfiles is enabled while writing a\n" +
         " table with ORC file format, enabling this config will do stripe level fast merge\n" +
         " for small ORC files. Note that enabling this config will not honor padding tolerance\n" +
         " config (hive.exec.orc.block.padding.tolerance)."),
-    HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
-        "org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat",
-        "Input file format to use for ORC stripe level merging (for internal use only)"),
-    HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
-        "hive.merge.current.job.has.dynamic.partitions", false, ""),
 
     HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
         "If this is set the header for RCFiles will simply be RCF.  If this is not\n" +
@@ -1676,17 +1669,6 @@ public class HiveConf extends Configurat
         "               it will now take 512 reducers, similarly if the max number of reducers is 511,\n" +
         "               and a job was going to use this many, it will now use 256 reducers."),
 
-    /* The following section contains all configurations used for list bucketing feature.*/
-    /* This is not for clients. but only for block merge task. */
-    /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
-    /* about alter table...concatenate and list bucketing case. */
-    HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING(
-        "hive.merge.current.job.concatenate.list.bucketing", true, ""),
-    /* This is not for clients. but only for block merge task. */
-    /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
-    /* about depth of list bucketing. */
-    HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH(
-            "hive.merge.current.job.concatenate.list.bucketing.depth", 0, ""),
     HIVEOPTLISTBUCKETING("hive.optimize.listbucketing", false,
         "Enable list bucketing optimizer. Default value is false so that we disable it by default."),
 

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Sat Sep 13 00:39:26 2014
@@ -96,6 +96,11 @@ minitez.query.files.shared=alter_merge_2
   orc_merge2.q,\
   orc_merge3.q,\
   orc_merge4.q,\
+  orc_merge5.q,\
+  orc_merge6.q,\
+  orc_merge7.q,\
+  orc_merge_incompat1.q,\
+  orc_merge_incompat2.q,\
   parallel.q,\
   ptf.q,\
   sample1.q,\

Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Sat Sep 13 00:39:26 2014
@@ -7,10 +7,6 @@
 package org.apache.hadoop.hive.ql.plan.api;
 
 
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
 public enum OperatorType implements org.apache.thrift.TEnum {
   JOIN(0),
   MAPJOIN(1),
@@ -33,7 +29,9 @@ public enum OperatorType implements org.
   PTF(18),
   MUX(19),
   DEMUX(20),
-  EVENT(21);
+  EVENT(21),
+  ORCFILEMERGE(22),
+  RCFILEMERGE(23);
 
   private final int value;
 
@@ -98,6 +96,10 @@ public enum OperatorType implements org.
         return DEMUX;
       case 21:
         return EVENT;
+      case 22:
+        return ORCFILEMERGE;
+      case 23:
+        return RCFILEMERGE;
       default:
         return null;
     }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Fast file merge operator for ORC and RCfile. This is an abstract class which
+ * does not process any rows. Refer {@link org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator}
+ * or {@link org.apache.hadoop.hive.ql.exec.RCFileMergeOperator} for more details.
+ */
+public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
+    extends Operator<T> implements Serializable {
+
+  public static final String BACKUP_PREFIX = "_backup.";
+  public static final Log LOG = LogFactory
+      .getLog(AbstractFileMergeOperator.class);
+
+  protected JobConf jc;
+  protected FileSystem fs;
+  protected boolean autoDelete;
+  protected boolean exception;
+  protected Path outPath;
+  protected Path finalPath;
+  protected Path dpPath;
+  protected Path tmpPath;
+  protected Path taskTmpPath;
+  protected int listBucketingDepth;
+  protected boolean hasDynamicPartitions;
+  protected boolean isListBucketingAlterTableConcatenate;
+  protected boolean tmpPathFixedConcatenate;
+  protected boolean tmpPathFixed;
+  protected Set<Path> incompatFileSet;
+  protected transient DynamicPartitionCtx dpCtx;
+
+  @Override
+  public void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    this.jc = new JobConf(hconf);
+    incompatFileSet = new HashSet<Path>();
+    autoDelete = false;
+    exception = false;
+    tmpPathFixed = false;
+    tmpPathFixedConcatenate = false;
+    outPath = null;
+    finalPath = null;
+    dpPath = null;
+    tmpPath = null;
+    taskTmpPath = null;
+    dpCtx = conf.getDpCtx();
+    hasDynamicPartitions = conf.hasDynamicPartitions();
+    isListBucketingAlterTableConcatenate = conf
+        .isListBucketingAlterTableConcatenate();
+    listBucketingDepth = conf.getListBucketingDepth();
+    Path specPath = conf.getOutputPath();
+    updatePaths(Utilities.toTempPath(specPath),
+        Utilities.toTaskTempPath(specPath));
+    try {
+      fs = specPath.getFileSystem(hconf);
+      autoDelete = fs.deleteOnExit(outPath);
+    } catch (IOException e) {
+      this.exception = true;
+      throw new HiveException("Failed to initialize AbstractFileMergeOperator",
+          e);
+    }
+  }
+
+  // sets up temp and task temp path
+  private void updatePaths(Path tp, Path ttp) {
+    String taskId = Utilities.getTaskId(jc);
+    tmpPath = tp;
+    taskTmpPath = ttp;
+    finalPath = new Path(tp, taskId);
+    outPath = new Path(ttp, Utilities.toTempPath(taskId));
+  }
+
+  /**
+   * Fixes tmpPath to point to the correct partition. Initialize operator will
+   * set tmpPath and taskTmpPath based on root table directory. So initially,
+   * tmpPath will be <prefix>/_tmp.-ext-10000 and taskTmpPath will be
+   * <prefix>/_task_tmp.-ext-10000. The depth of these two paths will be 0.
+   * Now, in case of dynamic partitioning or list bucketing the inputPath will
+   * have additional sub-directories under root table directory. This function
+   * updates the tmpPath and taskTmpPath to reflect these additional
+   * subdirectories. It updates tmpPath and taskTmpPath in the following way
+   * 1. finds out the difference in path based on depthDiff provided
+   * and saves the path difference in newPath
+   * 2. newPath is used to update the existing tmpPath and taskTmpPath similar
+   * to the way initializeOp() does.
+   *
+   * Note: The path difference between inputPath and tmpDepth can be DP or DP+LB.
+   * This method will automatically handle it.
+   *
+   * Continuing the example above, if inputPath is <prefix>/-ext-10000/hr=a1/,
+   * newPath will be hr=a1/. Then, tmpPath and taskTmpPath will be updated to
+   * <prefix>/-ext-10000/hr=a1/_tmp.ext-10000 and
+   * <prefix>/-ext-10000/hr=a1/_task_tmp.ext-10000 respectively.
+   * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed
+   * values + merge.
+   *
+   * @param inputPath - input path
+   * @throws java.io.IOException
+   */
+  protected void fixTmpPath(Path inputPath, int depthDiff) throws IOException {
+
+    // don't need to update tmp paths when there is no depth difference in paths
+    if (depthDiff <=0) {
+      return;
+    }
+
+    dpPath = inputPath;
+    Path newPath = new Path(".");
+
+    // Build the path from bottom up
+    while (inputPath != null && depthDiff > 0) {
+      newPath = new Path(inputPath.getName(), newPath);
+      depthDiff--;
+      inputPath = inputPath.getParent();
+    }
+
+    Path newTmpPath = new Path(tmpPath, newPath);
+    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+    if (!fs.exists(newTmpPath)) {
+      fs.mkdirs(newTmpPath);
+    }
+    updatePaths(newTmpPath, newTaskTmpPath);
+  }
+
+  /**
+   * Validates that each input path belongs to the same partition since each
+   * mapper merges the input to a single output directory
+   *
+   * @param inputPath - input path
+   */
+  protected void checkPartitionsMatch(Path inputPath) throws IOException {
+    if (!dpPath.equals(inputPath)) {
+      // Temp partition input path does not match exist temp path
+      String msg = "Multiple partitions for one merge mapper: " + dpPath +
+          " NOT EQUAL TO "
+          + inputPath;
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  protected void fixTmpPath(Path path) throws IOException {
+
+    // Fix temp path for alter table ... concatenate
+    if (isListBucketingAlterTableConcatenate) {
+      if (this.tmpPathFixedConcatenate) {
+        checkPartitionsMatch(path);
+      } else {
+        fixTmpPath(path, listBucketingDepth);
+        tmpPathFixedConcatenate = true;
+      }
+    } else {
+      if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+        if (tmpPathFixed) {
+          checkPartitionsMatch(path);
+        } else {
+          // We haven't fixed the TMP path for this mapper yet
+          int depthDiff = path.depth() - tmpPath.depth();
+          fixTmpPath(path, depthDiff);
+          tmpPathFixed = true;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    try {
+      if (!exception) {
+        FileStatus fss = fs.getFileStatus(outPath);
+        if (!fs.rename(outPath, finalPath)) {
+          throw new IOException(
+              "Unable to rename " + outPath + " to " + finalPath);
+        }
+        LOG.info("renamed path " + outPath + " to " + finalPath + " . File" +
+            " size is "
+            + fss.getLen());
+
+        // move any incompatible files to final path
+        if (!incompatFileSet.isEmpty()) {
+          for (Path incompatFile : incompatFileSet) {
+            String fileName = incompatFile.getName();
+            Path destFile = new Path(finalPath.getParent(), fileName);
+            try {
+              Utilities.renameOrMoveFiles(fs, incompatFile, destFile);
+              LOG.info("Moved incompatible file " + incompatFile + " to " +
+                  destFile);
+            } catch (HiveException e) {
+              LOG.error("Unable to move " + incompatFile + " to " + destFile);
+              throw new IOException(e);
+            }
+          }
+        }
+      } else {
+        if (!autoDelete) {
+          fs.delete(outPath, true);
+        }
+      }
+    } catch (IOException e) {
+      throw new HiveException("Failed to close AbstractFileMergeOperator", e);
+    }
+  }
+
+  @Override
+  public void jobCloseOp(Configuration hconf, boolean success)
+      throws HiveException {
+    try {
+      Path outputDir = conf.getOutputPath();
+      FileSystem fs = outputDir.getFileSystem(hconf);
+      Path backupPath = backupOutputPath(fs, outputDir);
+      Utilities
+          .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
+              null, reporter);
+      if (success) {
+        LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
+      }
+      if (backupPath != null) {
+        fs.delete(backupPath, true);
+      }
+    } catch (IOException e) {
+      throw new HiveException("Failed jobCloseOp for AbstractFileMergeOperator",
+          e);
+    }
+    super.jobCloseOp(hconf, success);
+  }
+
+  private Path backupOutputPath(FileSystem fs, Path outpath)
+      throws IOException, HiveException {
+    if (fs.exists(outpath)) {
+      Path backupPath = new Path(outpath.getParent(),
+          BACKUP_PREFIX + outpath.getName());
+      Utilities.rename(fs, outpath, backupPath);
+      return backupPath;
+    } else {
+      return null;
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sat Sep 13 00:39:26 2014
@@ -18,32 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.util.StringUtils.stringifyException;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -88,8 +62,9 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -133,14 +108,19 @@ import org.apache.hadoop.hive.ql.plan.De
 import org.apache.hadoop.hive.ql.plan.DropDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DropIndexDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.GrantDesc;
 import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.MsckDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
 import org.apache.hadoop.hive.ql.plan.PrivilegeDesc;
 import org.apache.hadoop.hive.ql.plan.PrivilegeObjectDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
 import org.apache.hadoop.hive.ql.plan.RevokeDesc;
 import org.apache.hadoop.hive.ql.plan.RoleDDLDesc;
@@ -194,6 +174,33 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.hive.common.util.AnnotationUtils;
 import org.stringtemplate.v4.ST;
 
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.util.StringUtils.stringifyException;
+
 /**
  * DDLTask implementation.
  *
@@ -546,15 +553,39 @@ public class DDLTask extends Task<DDLWor
    */
   private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
       throws HiveException {
+    ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx();
+    boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir();
+    int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
+
     // merge work only needs input and output.
-    MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
-        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
+    MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(),
+        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName());
     mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
     mergeWork.resolveConcatenateMerge(db.getConf());
     mergeWork.setMapperCannotSpanPartns(true);
-    mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
+    mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass().getName());
+    final FileMergeDesc fmd;
+    if (mergeFilesDesc.getInputFormatClass().equals(RCFileInputFormat.class)) {
+      fmd = new RCFileMergeDesc();
+    } else {
+      // safe to assume else is ORC as semantic analyzer will check for RC/ORC
+      fmd = new OrcFileMergeDesc();
+    }
+
+    fmd.setDpCtx(null);
+    fmd.setHasDynamicPartitions(false);
+    fmd.setListBucketingAlterTableConcatenate(lbatc);
+    fmd.setListBucketingDepth(lbd);
+    fmd.setOutputPath(mergeFilesDesc.getOutputDir());
+
+    Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd);
+
+    LinkedHashMap<String, Operator<? extends  OperatorDesc>> aliasToWork =
+        new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+    aliasToWork.put(mergeFilesDesc.getInputDir().toString(), mergeOp);
+    mergeWork.setAliasToWork(aliasToWork);
     DriverContext driverCxt = new DriverContext();
-    MergeTask taskExec = new MergeTask();
+    MergeFileTask taskExec = new MergeFileTask();
     taskExec.initialize(db.getConf(), null, driverCxt);
     taskExec.setWork(mergeWork);
     taskExec.setQueryPlan(this.getQueryPlan());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Sep 13 00:39:26 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -47,7 +47,13 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
 import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
@@ -55,7 +61,12 @@ import org.apache.hadoop.util.StringUtil
 import java.io.IOException;
 import java.io.Serializable;
 import java.security.AccessControlException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * MoveTask implementation.
@@ -294,7 +305,7 @@ public class MoveTask extends Task<MoveW
           while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
             task = (Task)task.getParentTasks().get(0);
             // If it was a merge task or a local map reduce task, nothing can be inferred
-            if (task instanceof MergeTask || task instanceof MapredLocalTask) {
+            if (task instanceof MergeFileTask || task instanceof MapredLocalTask) {
               break;
             }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Sat Sep 13 00:39:26 2014
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -53,7 +49,9 @@ import org.apache.hadoop.hive.ql.plan.Li
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MuxDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -62,6 +60,10 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * OperatorFactory.
  *
@@ -108,6 +110,10 @@ public final class OperatorFactory {
         AppMasterEventOperator.class));
     opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
         AppMasterEventOperator.class));
+    opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
+        RCFileMergeOperator.class));
+    opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
+        OrcFileMergeOperator.class));
   }
 
   static {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileKeyWrapper;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileValueWrapper;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Fast file merge operator for ORC files.
+ */
+public class OrcFileMergeOperator extends
+    AbstractFileMergeOperator<OrcFileMergeDesc> {
+  public final static Log LOG = LogFactory.getLog("OrcFileMergeOperator");
+
+  // These parameters must match for all orc files involved in merging. If it
+  // does not merge, the file will be put into incompatible file set and will
+  // not be merged.
+  CompressionKind compression = null;
+  long compressBuffSize = 0;
+  List<Integer> version;
+  int columnCount = 0;
+  int rowIndexStride = 0;
+
+  Writer outWriter;
+  Path prevPath;
+  private Reader reader;
+  private FSDataInputStream fdis;
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    Object[] keyValue = (Object[]) row;
+    processKeyValuePairs(keyValue[0], keyValue[1]);
+  }
+
+  private void processKeyValuePairs(Object key, Object value)
+      throws HiveException {
+    try {
+      OrcFileValueWrapper v;
+      OrcFileKeyWrapper k;
+      if (key instanceof CombineHiveKey) {
+        k = (OrcFileKeyWrapper) ((CombineHiveKey) key).getKey();
+      } else {
+        k = (OrcFileKeyWrapper) key;
+      }
+
+      fixTmpPath(k.getInputPath().getParent());
+
+      v = (OrcFileValueWrapper) value;
+
+      if (prevPath == null) {
+        prevPath = k.getInputPath();
+        reader = OrcFile.createReader(fs, k.getInputPath());
+        LOG.info("ORC merge file input path: " + k.getInputPath());
+      }
+
+      // store the orc configuration from the first file. All other files should
+      // match this configuration before merging else will not be merged
+      if (outWriter == null) {
+        compression = k.getCompression();
+        compressBuffSize = k.getCompressBufferSize();
+        version = k.getVersionList();
+        columnCount = k.getTypes().get(0).getSubtypesCount();
+        rowIndexStride = k.getRowIndexStride();
+
+        // block size and stripe size will be from config
+        outWriter = OrcFile.createWriter(outPath,
+            OrcFile.writerOptions(jc).compress(compression)
+                .inspector(reader.getObjectInspector()));
+        LOG.info("ORC merge file output path: " + outPath);
+      }
+
+      if (!checkCompatibility(k)) {
+        incompatFileSet.add(k.getInputPath());
+        return;
+      }
+
+      // next file in the path
+      if (!k.getInputPath().equals(prevPath)) {
+        reader = OrcFile.createReader(fs, k.getInputPath());
+      }
+
+      // initialize buffer to read the entire stripe
+      byte[] buffer = new byte[(int) v.getStripeInformation().getLength()];
+      fdis = fs.open(k.getInputPath());
+      fdis.readFully(v.getStripeInformation().getOffset(), buffer, 0,
+          (int) v.getStripeInformation().getLength());
+
+      // append the stripe buffer to the new ORC file
+      outWriter.appendStripe(buffer, 0, buffer.length, v.getStripeInformation(),
+          v.getStripeStatistics());
+
+      LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : "
+          + v.getStripeInformation().getOffset() + " length: "
+          + v.getStripeInformation().getLength() + " ]");
+
+      // add user metadata to footer in case of any
+      if (v.isLastStripeInFile()) {
+        outWriter.appendUserMetadata(v.getUserMetadata());
+      }
+    } catch (Throwable e) {
+      this.exception = true;
+      closeOp(true);
+      throw new HiveException(e);
+    }
+  }
+
+  private boolean checkCompatibility(OrcFileKeyWrapper k) {
+    // check compatibility with subsequent files
+    if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) {
+      LOG.info("Incompatible ORC file merge! Column counts does not match for "
+          + k.getInputPath());
+      return false;
+    }
+
+    if (!k.getCompression().equals(compression)) {
+      LOG.info("Incompatible ORC file merge! Compression codec does not match" +
+          " for " + k.getInputPath());
+      return false;
+    }
+
+    if (k.getCompressBufferSize() != compressBuffSize) {
+      LOG.info("Incompatible ORC file merge! Compression buffer size does not" +
+          " match for " + k.getInputPath());
+      return false;
+
+    }
+
+    if (!k.getVersionList().equals(version)) {
+      LOG.info("Incompatible ORC file merge! Version does not match for "
+          + k.getInputPath());
+      return false;
+    }
+
+    if (k.getRowIndexStride() != rowIndexStride) {
+      LOG.info("Incompatible ORC file merge! Row index stride does not match" +
+          " for " + k.getInputPath());
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.ORCFILEMERGE;
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "OFM";
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    // close writer
+    if (outWriter == null) {
+      return;
+    }
+
+    try {
+      if (fdis != null) {
+        fdis.close();
+        fdis = null;
+      }
+
+      outWriter.close();
+      outWriter = null;
+    } catch (IOException e) {
+      throw new HiveException("Unable to close OrcFileMergeOperator", e);
+    }
+    super.closeOp(abort);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+
+import java.io.IOException;
+
+/**
+ * Fast file merge operator for RC files.
+ */
+public class RCFileMergeOperator
+    extends AbstractFileMergeOperator<RCFileMergeDesc> {
+  public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
+
+  RCFile.Writer outWriter;
+  CompressionCodec codec = null;
+  int columnNumber = 0;
+
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    Object[] keyValue = (Object[]) row;
+    processKeyValuePairs(keyValue[0], keyValue[1]);
+  }
+
+  private void processKeyValuePairs(Object k, Object v)
+      throws HiveException {
+    try {
+
+      RCFileKeyBufferWrapper key;
+      if (k instanceof CombineHiveKey) {
+        key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+      } else {
+        key = (RCFileKeyBufferWrapper) k;
+      }
+      RCFileValueBufferWrapper value = (RCFileValueBufferWrapper) v;
+
+      fixTmpPath(key.getInputPath().getParent());
+
+      if (outWriter == null) {
+        codec = key.getCodec();
+        columnNumber = key.getKeyBuffer().getColumnNumber();
+        RCFileOutputFormat.setColumnNumber(jc, columnNumber);
+        outWriter = new RCFile.Writer(fs, jc, outPath, null, codec);
+      }
+
+      boolean sameCodec = ((codec == key.getCodec()) || codec.getClass().equals(
+          key.getCodec().getClass()));
+
+      if ((key.getKeyBuffer().getColumnNumber() != columnNumber) ||
+          (!sameCodec)) {
+        throw new IOException( "RCFileMerge failed because the input files" +
+            " use different CompressionCodec or have different column number" +
+            " setting.");
+      }
+
+      outWriter.flushBlock(key.getKeyBuffer(), value.getValueBuffer(),
+          key.getRecordLength(), key.getKeyLength(),
+          key.getCompressedKeyLength());
+    } catch (Throwable e) {
+      this.exception = true;
+      closeOp(true);
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    // close writer
+    if (outWriter == null) {
+      return;
+    }
+
+    try {
+      outWriter.close();
+    } catch (IOException e) {
+      throw new HiveException("Unable to close RCFileMergeOperator", e);
+    }
+    outWriter = null;
+
+    super.closeOp(abort);
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.RCFILEMERGE;
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "RFM";
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Sat Sep 13 00:39:26 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
 import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
@@ -94,8 +94,8 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
     taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
     taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
-    taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
-        MergeTask.class));
+    taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
+        MergeFileTask.class));
     taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
         DependencyCollectionTask.class));
     taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Sep 13 00:39:26 2014
@@ -18,67 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.ExceptionListener;
-import java.beans.Expression;
-import java.beans.PersistenceDelegate;
-import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
 import org.antlr.runtime.CommonToken;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
@@ -122,9 +66,8 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
-import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -181,11 +124,66 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 /**
  * Utilities.
@@ -352,9 +350,8 @@ public final class Utilities {
         if(MAP_PLAN_NAME.equals(name)){
           if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
             gWork = deserializePlan(in, MapWork.class, conf);
-          } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
-              OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
-            gWork = deserializePlan(in, MergeWork.class, conf);
+          } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+            gWork = deserializePlan(in, MergeFileWork.class, conf);
           } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
             gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
           } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Sep 13 00:39:26 2014
@@ -17,22 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.login.LoginException;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -54,6 +41,9 @@ import org.apache.hadoop.hive.ql.io.Comb
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -68,6 +58,7 @@ import org.apache.hadoop.hive.shims.Hado
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -113,9 +104,20 @@ import org.apache.tez.runtime.library.co
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -212,6 +214,16 @@ public class DagUtils {
     conf.set("mapred.mapper.class", ExecMapper.class.getName());
     conf.set("mapred.input.format.class", inpFormat);
 
+    if (mapWork instanceof MergeFileWork) {
+      MergeFileWork mfWork = (MergeFileWork) mapWork;
+      // This mapper class is used for serializaiton/deserializaiton of merge
+      // file work.
+      conf.set("mapred.mapper.class", MergeFileMapper.class.getName());
+      conf.set("mapred.input.format.class", mfWork.getInputformat());
+      conf.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+          FileOutputFormat.class);
+    }
+
     return conf;
   }
 
@@ -486,6 +498,21 @@ public class DagUtils {
       }
     }
 
+    if (mapWork instanceof MergeFileWork) {
+      Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
+      // prepare the tmp output directory. The output tmp directory should
+      // exist before jobClose (before renaming after job completion)
+      Path tempOutPath = Utilities.toTempPath(outputPath);
+      try {
+        if (!fs.exists(tempOutPath)) {
+          fs.mkdirs(tempOutPath);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Can't make path " + outputPath + " : " + e.getMessage());
+      }
+    }
+
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
 
@@ -515,9 +542,13 @@ public class DagUtils {
     }
 
     UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
-    map = Vertex.create(mapWork.getName(),
-        ProcessorDescriptor.create(MapTezProcessor.class.getName()).
-        setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+    String procClassName = MapTezProcessor.class.getName();
+    if (mapWork instanceof MergeFileWork) {
+      procClassName = MergeFileTezProcessor.class.getName();
+    }
+    map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
+        .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
@@ -784,7 +815,7 @@ public class DagUtils {
   }
 
   /**
-   * @param path - the path from which we try to determine the resource base name
+   * @param path - the string from which we try to determine the resource base name
    * @return the name of the resource from a given path string.
    */
   public String getResourceBaseName(Path path) {
@@ -831,7 +862,8 @@ public class DagUtils {
             conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
                 HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
         long sleepInterval = HiveConf.getTimeVar(
-            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
+            TimeUnit.MILLISECONDS);
         LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
             + sleepInterval);
         boolean found = false;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.util.Map;
+
+/**
+ * Record processor for fast merging of files.
+ */
+public class MergeFileRecordProcessor extends RecordProcessor {
+
+  public static final Log LOG = LogFactory
+      .getLog(MergeFileRecordProcessor.class);
+
+  protected Operator<? extends OperatorDesc> mergeOp;
+  private final ExecMapperContext execContext = new ExecMapperContext();
+  protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+  private MergeFileWork mfWork;
+  private boolean abort = false;
+  private Object[] row = new Object[2];
+
+  @Override
+  void init(JobConf jconf, ProcessorContext processorContext,
+      MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+    super.init(jconf, processorContext, mrReporter, inputs, outputs);
+
+    //Update JobConf using MRInput, info like filename comes via this
+    MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
+    Configuration updatedConf = mrInput.getConfigUpdates();
+    if (updatedConf != null) {
+      for (Map.Entry<String, String> entry : updatedConf) {
+        jconf.set(entry.getKey(), entry.getValue());
+      }
+    }
+
+    createOutputMap();
+    // Start all the Outputs.
+    for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+      outputEntry.getValue().start();
+      ((TezProcessor.TezKVOutputCollector) outMap.get(outputEntry.getKey()))
+          .initialize();
+    }
+
+    org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
+        .getCache(jconf);
+    try {
+      execContext.setJc(jconf);
+      // create map and fetch operators
+      MapWork mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+      if (mapWork == null) {
+        mapWork = Utilities.getMapWork(jconf);
+        if (mapWork instanceof MergeFileWork) {
+          mfWork = (MergeFileWork) mapWork;
+        } else {
+          throw new RuntimeException("MapWork should be an instance of" +
+              " MergeFileWork.");
+        }
+        cache.cache(MAP_PLAN_KEY, mapWork);
+      } else {
+        Utilities.setMapWork(jconf, mapWork);
+      }
+
+      String alias = mfWork.getAliasToWork().keySet().iterator().next();
+      mergeOp = mfWork.getAliasToWork().get(alias);
+      LOG.info(mergeOp.dump(0));
+
+      MapredContext.init(true, new JobConf(jconf));
+      ((TezContext) MapredContext.get()).setInputs(inputs);
+      mergeOp.setExecContext(execContext);
+      mergeOp.initializeLocalWork(jconf);
+      mergeOp.initialize(jconf, null);
+
+      OperatorUtils.setChildrenCollector(mergeOp.getChildOperators(), outMap);
+      mergeOp.setReporter(reporter);
+      MapredContext.get().setReporter(reporter);
+    } catch (Throwable e) {
+      if (e instanceof OutOfMemoryError) {
+        // will this be true here?
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        throw new RuntimeException("Map operator initialization failed", e);
+      }
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+  }
+
+  @Override
+  void run() throws Exception {
+    MRInputLegacy in = TezProcessor.getMRInput(inputs);
+    KeyValueReader reader = in.getReader();
+
+    //process records until done
+    while (reader.next()) {
+      boolean needMore = processRow(reader.getCurrentKey(),
+          reader.getCurrentValue());
+      if (!needMore) {
+        break;
+      }
+    }
+  }
+
+  @Override
+  void close() {
+    // check if there are IOExceptions
+    if (!abort) {
+      abort = execContext.getIoCxt().getIOExceptions();
+    }
+
+    // detecting failed executions by exceptions thrown by the operator tree
+    try {
+      if (mergeOp == null || mfWork == null) {
+        return;
+      }
+      mergeOp.close(abort);
+
+      if (isLogInfoEnabled) {
+        logCloseInfo();
+      }
+      ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter);
+      mergeOp.preorderMap(rps);
+    } catch (Exception e) {
+      if (!abort) {
+        // signal new failure to map-reduce
+        l4j.error("Hit error while closing operators - failing tree");
+        throw new RuntimeException("Hive Runtime Error while closing operators",
+            e);
+      }
+    } finally {
+      Utilities.clearWorkMap();
+      MapredContext.close();
+    }
+  }
+
+  /**
+   * @param key   key to process
+   * @param value value to process
+   * @return true if it is not done and can take more inputs
+   */
+  private boolean processRow(Object key, Object value) {
+    // reset the execContext for each new row
+    execContext.resetRow();
+
+    try {
+      if (mergeOp.getDone()) {
+        return false; //done
+      } else {
+        row[0] = key;
+        row[1] = value;
+        mergeOp.processOp(row, 0);
+        if (isLogInfoEnabled) {
+          logProgress();
+        }
+      }
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        l4j.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+    return true; //give me more
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Tez processor for fast file merging. This is same as TezProcessor except it
+ * has different record processor.
+ */
+public class MergeFileTezProcessor extends TezProcessor {
+
+  public MergeFileTezProcessor(ProcessorContext context) {
+    super(context);
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+    rproc = new MergeFileRecordProcessor();
+    MRInputLegacy mrInput = getMRInput(inputs);
+    try {
+      mrInput.init();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed while initializing MRInput", e);
+    }
+    initializeAndRunProcessor(inputs, outputs);
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Sat Sep 13 00:39:26 2014
@@ -16,13 +16,8 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
@@ -34,8 +29,12 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Process input from tez LogicalInput and write output
@@ -66,7 +65,7 @@ public abstract class RecordProcessor  {
   /**
    * Common initialization code for RecordProcessors
    * @param jconf
-   * @param processorContext the {@link TezProcessorContext}
+   * @param processorContext the {@link ProcessorContext}
    * @param mrReporter
    * @param inputs map of Input names to {@link LogicalInput}s
    * @param outputs map of Output names to {@link LogicalOutput}s

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sat Sep 13 00:39:26 2014
@@ -17,12 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +34,11 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -51,13 +50,15 @@ public class TezProcessor extends Abstra
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
   protected boolean isMap = false;
 
-  RecordProcessor rproc = null;
+  protected RecordProcessor rproc = null;
 
-  private JobConf jobConf;
+  protected JobConf jobConf;
 
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
+  protected ProcessorContext processorContext;
+
   protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
   static {
@@ -121,9 +122,6 @@ public class TezProcessor extends Abstra
   public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
       throws Exception {
 
-    Throwable originalThrowable = null;
-
-    try{
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
@@ -142,14 +140,23 @@ public class TezProcessor extends Abstra
         rproc = new ReduceRecordProcessor();
       }
 
+      initializeAndRunProcessor(inputs, outputs);
+  }
+
+  protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs)
+      throws Exception {
+    Throwable originalThrowable = null;
+    try {
       TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
       // Start the actual Inputs. After MRInput initialization.
-      for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+      for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
         if (!cacheAccess.isInputCached(inputEntry.getKey())) {
           LOG.info("Input: " + inputEntry.getKey() + " is not cached");
           inputEntry.getValue().start();
         } else {
-          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+          LOG.info("Input: " + inputEntry.getKey() +
+              " is already cached. Skipping start");
         }
       }
 
@@ -170,7 +177,7 @@ public class TezProcessor extends Abstra
       }
 
       try {
-        if(rproc != null){
+        if (rproc != null) {
           rproc.close();
         }
       } catch (Throwable t) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class MergeFileInputFormat extends FileInputFormat {
+
+  @Override
+  public abstract RecordReader getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException;
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * Mapper for fast file merging of ORC and RC files. This is very similar to
+ * ExecMapper except that root operator is AbstractFileMergeOperator. This class
+ * name is used for serialization and deserialization of MergeFileWork.
+ */
+public class MergeFileMapper extends MapReduceBase implements Mapper {
+  public static final Log LOG = LogFactory.getLog("MergeFileMapper");
+  private static final String PLAN_KEY = "__MAP_PLAN__";
+
+  private JobConf jc;
+  private Operator<? extends OperatorDesc> op;
+  private AbstractFileMergeOperator mergeOp;
+  private Object[] row;
+  private boolean abort;
+
+  @Override
+  public void configure(JobConf job) {
+    jc = job;
+    ObjectCache cache = ObjectCacheFactory.getCache(job);
+    MapWork mapWork = (MapWork) cache.retrieve(PLAN_KEY);
+
+    // if map work is found in object cache then return it else retrieve the
+    // plan from filesystem and cache it
+    if (mapWork == null) {
+      mapWork = Utilities.getMapWork(job);
+      cache.cache(PLAN_KEY, mapWork);
+    } else {
+      Utilities.setMapWork(job, mapWork);
+    }
+
+    try {
+      if (mapWork instanceof MergeFileWork) {
+        MergeFileWork mfWork = (MergeFileWork) mapWork;
+        String alias = mfWork.getAliasToWork().keySet().iterator().next();
+        op = mfWork.getAliasToWork().get(alias);
+        if (op instanceof AbstractFileMergeOperator) {
+          mergeOp = (AbstractFileMergeOperator) op;
+          mergeOp.initializeOp(jc);
+          row = new Object[2];
+          abort = false;
+        } else {
+          abort = true;
+          throw new RuntimeException(
+              "Merge file work's top operator should be an" +
+                  " instance of AbstractFileMergeOperator");
+        }
+      } else {
+        abort = true;
+        throw new RuntimeException("Map work should be a merge file work.");
+      }
+    } catch (HiveException e) {
+      abort = true;
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      mergeOp.closeOp(abort);
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
+    super.close();
+  }
+
+  @Override
+  public void map(Object key, Object value, OutputCollector output,
+      Reporter reporter) throws IOException {
+
+    row[0] = key;
+    row[1] = value;
+    try {
+      mergeOp.processOp(row, 0);
+    } catch (HiveException e) {
+      abort = true;
+      throw new IOException(e);
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+
+public class MergeFileOutputFormat extends
+    FileOutputFormat<Object, Object> {
+
+  @Override
+  public RecordWriter<Object, Object> getRecordWriter(FileSystem ignored, JobConf job, String name,
+      Progressable progress) throws IOException {
+    return new RecordWriter<Object, Object>() {
+      public void write(Object key, Object value) {
+        throw new RuntimeException("Should not be called");
+      }
+
+      public void close(Reporter reporter) {
+      }
+    };
+  }
+
+}
\ No newline at end of file