You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 03:31:35 UTC

[12/13] hive git commit: HIVE-14990 : run all tests for MM tables and fix the issues that are found - more issues (Sergey Shelukhin)

HIVE-14990 : run all tests for MM tables and fix the issues that are found - more issues (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1155ed75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1155ed75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1155ed75

Branch: refs/heads/hive-14535
Commit: 1155ed756dd4cf0c73494ee146a47e9e0aa39575
Parents: 46e7657
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Nov 14 19:27:09 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Nov 14 19:27:09 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   6 +
 .../hadoop/hive/common/ValidWriteIds.java       |  39 ++++++-
 .../hive/ql/exec/AbstractFileMergeOperator.java |   4 +-
 .../apache/hadoop/hive/ql/exec/CopyTask.java    |   1 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   3 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   2 +-
 .../hadoop/hive/ql/exec/TableScanOperator.java  |   4 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  51 ++++++---
 .../hive/ql/io/CombineHiveInputFormat.java      |  14 ++-
 .../hadoop/hive/ql/io/merge/MergeFileWork.java  |   7 +-
 .../ql/io/rcfile/stats/PartialScanMapper.java   |   5 +-
 .../ql/io/rcfile/stats/PartialScanTask.java     |   2 +-
 .../ql/io/rcfile/stats/PartialScanWork.java     |   6 +-
 .../formatting/JsonMetaDataFormatter.java       |   2 +-
 .../formatting/TextMetaDataFormatter.java       | 114 ++++++++++---------
 .../hive/ql/optimizer/GenMRTableScan1.java      |   4 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |   2 +-
 .../physical/GenMRSkewJoinProcessor.java        |   2 +-
 .../physical/GenSparkSkewJoinProcessor.java     |   4 +-
 .../physical/LocalMapJoinProcFactory.java       |   2 +-
 .../physical/index/IndexWhereProcessor.java     |   2 +-
 .../ql/parse/AlterTablePartMergeFilesDesc.java  |   9 ++
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |   1 +
 .../hadoop/hive/ql/parse/IndexUpdater.java      |  40 ++++---
 .../hive/ql/parse/ProcessAnalyzeTable.java      |   4 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  22 ++--
 .../parse/spark/SparkProcessAnalyzeTable.java   |   4 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  10 +-
 .../hadoop/hive/ql/plan/PartitionDesc.java      |  11 ++
 .../apache/hadoop/hive/ql/plan/TableDesc.java   |   7 ++
 .../hive/ql/exec/TestFileSinkOperator.java      |   3 +-
 31 files changed, 250 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 95a553b..2660cce 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.shims.HadoopShims;
@@ -517,6 +518,11 @@ public final class FileUtils {
     return true;
   }
 
+  public static boolean mkdir(FileSystem fs, Path f, Configuration conf) throws IOException {
+    boolean inheritPerms = HiveConf.getBoolVar(conf, ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+    return mkdir(fs, f, inheritPerms, conf);
+  }
+
   /**
    * Creates the directory and all necessary parent directories.
    * @param fs FileSystem to use

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index 88d6dfa..2ce4040 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -31,6 +31,7 @@ public class ValidWriteIds {
   public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null);
 
   public static final String MM_PREFIX = "mm";
+  private static final String CURRENT_SUFFIX = ".current";
 
   private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class);
 
@@ -53,9 +54,11 @@ public class ValidWriteIds {
   }
 
   public static ValidWriteIds createFromConf(Configuration conf, String fullTblName) {
-    String idStr = conf.get(createConfKey(fullTblName), null);
+    String key = createConfKey(fullTblName);
+    String idStr = conf.get(key, null);
+    String current = conf.get(key + CURRENT_SUFFIX, null);
     if (idStr == null || idStr.isEmpty()) return null;
-    return new ValidWriteIds(idStr);
+    return new ValidWriteIds(idStr, current);
   }
 
   private static String createConfKey(String dbName, String tblName) {
@@ -66,7 +69,7 @@ public class ValidWriteIds {
     return VALID_WRITEIDS_PREFIX + fullName;
   }
 
-  private ValidWriteIds(String src) {
+  private ValidWriteIds(String src, String current) {
     // TODO: lifted from ACID config implementation... optimize if needed? e.g. ranges, base64
     String[] values = src.split(":");
     highWatermark = Long.parseLong(values[0]);
@@ -77,25 +80,48 @@ public class ValidWriteIds {
       for(int i = 3; i < values.length; ++i) {
         ids.add(Long.parseLong(values[i]));
       }
+      if (current != null) {
+        long currentId = Long.parseLong(current);
+        if (areIdsValid) {
+          ids.add(currentId);
+        } else {
+          ids.remove(currentId);
+        }
+      }
+    } else if (current != null) {
+        long currentId = Long.parseLong(current);
+        areIdsValid = true;
+        ids = new HashSet<Long>();
+        ids.add(currentId);
     } else {
       areIdsValid = false;
       ids = null;
     }
   }
 
+  public static void addCurrentToConf(
+      Configuration conf, String dbName, String tblName, long mmWriteId) {
+    String key = createConfKey(dbName, tblName) + CURRENT_SUFFIX;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting " + key + " => " + mmWriteId);
+    }
+    conf.set(key, Long.toString(mmWriteId));
+  }
+
   public void addToConf(Configuration conf, String dbName, String tblName) {
     if (source == null) {
       source = toString();
     }
+    String key = createConfKey(dbName, tblName);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Setting " + createConfKey(dbName, tblName) + " => " + source);
+      LOG.debug("Setting " + key + " => " + source
+          + " (old value was " + conf.get(key, null) + ")");
     }
-    conf.set(createConfKey(dbName, tblName), source);
+    conf.set(key, source);
   }
 
   public static void clearConf(HiveConf conf, String dbName, String tblName) {
     if (LOG.isDebugEnabled()) {
-      // TODO# remove
       LOG.debug("Unsetting " + createConfKey(dbName, tblName));
     }
     conf.unset(createConfKey(dbName, tblName));
@@ -188,4 +214,5 @@ public class ValidWriteIds {
     }
     return writeId;
   }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index dedbb78..1315b99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -123,7 +123,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
       outPath = new Path(ttp, Utilities.toTempPath(taskId));
     }
     Utilities.LOG14535.info("Paths for merge " + taskId + ": tmp " + tmpPath + ", task "
-        + taskTmpPath + ", final " + finalPath + ", out " + outPath, new Exception());
+        + taskTmpPath + ", final " + finalPath + ", out " + outPath);
   }
 
   /**
@@ -297,7 +297,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
         // We don't expect missing buckets from mere (actually there should be no buckets),
         // so just pass null as bucketing context. Union suffix should also be accounted for.
         Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success,
-            dpLevels, lbLevels, null, mmWriteId, reporter);
+            dpLevels, lbLevels, null, mmWriteId, reporter, false);
       }
 
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index 9f89ea5..e8526f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -110,6 +110,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
   // Note: initially copied from LoadSemanticAnalyzer.
   private static FileStatus[] matchFilesOrDir(
       FileSystem fs, Path path, boolean isSourceMm) throws IOException {
+    if (!fs.exists(path)) return null;
     if (!isSourceMm) return matchFilesOneDir(fs, path, null);
     // TODO: this doesn't handle list bucketing properly. Does the original exim do that?
     FileStatus[] mmDirs = fs.listStatus(path, new ValidWriteIds.AnyIdDirFilter());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 1e348c6..be65f49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -651,7 +651,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
 
     // merge work only needs input and output.
     MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(),
-        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName());
+        mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName(),
+        mergeFilesDesc.getTableDesc());
     LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>();
     ArrayList<String> inputDirstr = new ArrayList<String>(1);
     inputDirstr.add(mergeFilesDesc.getInputDir().toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2864af4..5406de9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -1230,7 +1230,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           MissingBucketsContext mbc = new MissingBucketsContext(
               conf.getTableInfo(), numBuckets, conf.getCompressed());
           Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
-              dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter);
+              dpLevels, lbLevels, mbc, conf.getMmWriteId(), reporter, conf.isMmCtas());
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 68477ca..bb0af7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -78,11 +78,11 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
   private String schemaEvolutionColumns;
   private String schemaEvolutionColumnsTypes;
 
-  public TableDesc getTableDesc() {
+  public TableDesc getTableDescSkewJoin() {
     return tableDesc;
   }
 
-  public void setTableDesc(TableDesc tableDesc) {
+  public void setTableDescSkewJoin(TableDesc tableDesc) {
     this.tableDesc = tableDesc;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8c9f622..1b7a8a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1113,6 +1113,9 @@ public final class Utilities {
     if (orig.getName().indexOf(tmpPrefix) == 0) {
       return orig;
     }
+    if (orig.getName().contains("=1")) {
+      LOG.error("TODO# creating tmp path from " + orig, new Exception());
+    }
     return new Path(orig.getParent(), tmpPrefix + orig.getName());
   }
 
@@ -3312,8 +3315,8 @@ public final class Utilities {
 
       if (op instanceof FileSinkOperator) {
         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+        if (fdesc.isMmTable()) continue; // No need to create for MM tables
         Path tempDir = fdesc.getDirName();
-
         if (tempDir != null) {
           Path tempPath = Utilities.toTempPath(tempDir);
           FileSystem fs = tempPath.getFileSystem(conf);
@@ -3970,7 +3973,7 @@ public final class Utilities {
 
   public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
       boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId,
-      Reporter reporter) throws IOException, HiveException {
+      Reporter reporter, boolean isMmCtas) throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
     Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix);
     if (!success) {
@@ -3982,20 +3985,30 @@ public final class Utilities {
 
     Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")");
     // TODO# may be wrong if there are no splits (empty insert/CTAS)
-    FileStatus[] manifestFiles = fs.listStatus(manifestDir);
     List<Path> manifests = new ArrayList<>();
-    if (manifestFiles != null) {
-      for (FileStatus status : manifestFiles) {
-        Path path = status.getPath();
-        if (path.getName().endsWith(MANIFEST_EXTENSION)) {
-          Utilities.LOG14535.info("Reading manifest " + path);
-          manifests.add(path);
+    if (fs.exists(manifestDir)) {
+      FileStatus[] manifestFiles = fs.listStatus(manifestDir);
+      if (manifestFiles != null) {
+        for (FileStatus status : manifestFiles) {
+          Path path = status.getPath();
+          if (path.getName().endsWith(MANIFEST_EXTENSION)) {
+            Utilities.LOG14535.info("Reading manifest " + path);
+            manifests.add(path);
+          }
         }
       }
+    } else {
+      Utilities.LOG14535.info("No manifests found - query produced no output");
+      manifestDir = null;
     }
 
     Utilities.LOG14535.info("Looking for files in: " + specPath);
     ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+    if (isMmCtas && !fs.exists(specPath)) {
+      // TODO: do we also need to do this when creating an empty partition from select?
+      Utilities.LOG14535.info("Creating table directory for CTAS with no output at " + specPath);
+      FileUtils.mkdir(fs, specPath, hconf);
+    }
     Path[] files = getMmDirectoryCandidates(
         fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf);
     ArrayList<Path> mmDirectories = new ArrayList<>();
@@ -4019,15 +4032,17 @@ public final class Utilities {
       }
     }
 
-    Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
-    tryDelete(fs, manifestDir);
-    if (unionSuffix != null) {
-      // Also delete the parent directory if we are the last union FSOP to execute.
-      manifestDir = manifestDir.getParent();
-      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
-      if (remainingFiles == null || remainingFiles.length == 0) {
-        Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
-        tryDelete(fs, manifestDir);
+    if (manifestDir != null) {
+      Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+      tryDelete(fs, manifestDir);
+      if (unionSuffix != null) {
+        // Also delete the parent directory if we are the last union FSOP to execute.
+        manifestDir = manifestDir.getParent();
+        FileStatus[] remainingFiles = fs.listStatus(manifestDir);
+        if (remainingFiles == null || remainingFiles.length == 0) {
+          Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+          tryDelete(fs, manifestDir);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index f0257ff..86397af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -111,12 +111,16 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ
             getInputFormatFromCache(inputFormatClass, conf);
         boolean isAvoidSplitCombine = inputFormat instanceof AvoidSplitCombination &&
             ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf);
+        TableDesc tbl = part.getTableDesc();
+        boolean isMmNonMerge = false;
+        if (tbl != null) {
+          isMmNonMerge = !isMerge && MetaStoreUtils.isInsertOnlyTable(tbl.getProperties());
+        } else {
+          // This would be the case for obscure tasks like truncate column (unsupported for MM).
+          Utilities.LOG14535.warn("Assuming not insert-only; no table in partition spec " + part);
+        }
 
-        // Combined splits are not supported for MM tables right now.
-        // However, the merge for MM always combines one directory and should ignore that it's MM.
-        boolean isMmTableNonMerge = !isMerge
-            && MetaStoreUtils.isInsertOnlyTable(part.getTableDesc().getProperties());
-        if (isAvoidSplitCombine || isMmTableNonMerge) {
+        if (isAvoidSplitCombine || isMmNonMerge) {
           //if (LOG.isDebugEnabled()) {
             Utilities.LOG14535.info("The path [" + paths[i + start] +
                 "] is being parked for HiveInputFormat.getSplits");

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
index 94b9431..8d340df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
@@ -60,13 +60,13 @@ public class MergeFileWork extends MapWork {
   private Class<? extends InputFormat> internalInputFormat;
 
   public MergeFileWork(List<Path> inputPaths, Path outputDir,
-      String srcTblInputFormat) {
-    this(inputPaths, outputDir, false, srcTblInputFormat);
+      String srcTblInputFormat, TableDesc tbl) {
+    this(inputPaths, outputDir, false, srcTblInputFormat, tbl);
   }
 
   public MergeFileWork(List<Path> inputPaths, Path outputDir,
       boolean hasDynamicPartitions,
-      String srcTblInputFormat) {
+      String srcTblInputFormat, TableDesc tbl) {
     this.inputPaths = inputPaths;
     this.outputDir = outputDir;
     this.hasDynamicPartitions = hasDynamicPartitions;
@@ -78,6 +78,7 @@ public class MergeFileWork extends MapWork {
       this.internalInputFormat = RCFileBlockMergeInputFormat.class;
     }
     partDesc.setInputFileFormatClass(internalInputFormat);
+    partDesc.setTableDesc(tbl);
     for (Path path : this.inputPaths) {
       this.addPathToPartitionInfo(path, partDesc);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index 09e4a47..9a6406d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -88,8 +88,9 @@ public class PartialScanMapper extends MapReduceBase implements
     }
 
     try {
-      //CombineHiveInputFormat is set in PartialScanTask.
-      RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+      //CombineHiveInputFormat may be set in PartialScanTask.
+      RCFileKeyBufferWrapper key = (RCFileKeyBufferWrapper)
+          ((k instanceof CombineHiveKey) ?  ((CombineHiveKey) k).getKey() : k);
 
       // calculate rawdatasize
       KeyBuffer keyBuffer = key.getKeyBuffer();

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index dcd0e97..c8cd27d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -350,7 +350,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
     }
 
     QueryState queryState = new QueryState(new HiveConf(conf, PartialScanTask.class));
-    PartialScanWork mergeWork = new PartialScanWork(inputPaths);
+    PartialScanWork mergeWork = new PartialScanWork(inputPaths, null);
     DriverContext driverCxt = new DriverContext();
     PartialScanTask taskExec = new PartialScanTask();
     taskExec.initialize(queryState, null, driverCxt, new CompilationOpContext());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
index 919cea0..d8ee7d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.Mapper;
 
 /**
@@ -47,15 +48,18 @@ public class PartialScanWork extends MapWork implements Serializable {
   private transient List<Path> inputPaths;
   private String aggKey;
   private String statsTmpDir;
+  private TableDesc tblDesc;
 
   public PartialScanWork() {
   }
 
-  public PartialScanWork(List<Path> inputPaths) {
+  public PartialScanWork(List<Path> inputPaths, TableDesc tblDesc) {
     super();
     this.inputPaths = inputPaths;
+    this.tblDesc = tblDesc;
     PartitionDesc partDesc = new PartitionDesc();
     partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class);
+    partDesc.setTableDesc(tblDesc);
     for(Path path: this.inputPaths) {
       this.addPathToPartitionInfo(path, partDesc);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
index 3315806..68079c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
@@ -238,7 +238,7 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
    * @param tblPath not NULL
    * @throws IOException
    */
-  // Duplicates logic in TextMetaDataFormatter
+  // Duplicates logic in TextMetaDataFormatter TODO: wtf?!!
   private void putFileSystemsStats(MapBuilder builder, List<Path> locations,
       HiveConf conf, Path tblPath)
           throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
index b990bda..22908d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.metadata.formatting;
 
 import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -280,28 +281,30 @@ class TextMetaDataFormatter implements MetaDataFormatter {
     } catch (IOException e) {
       throw new HiveException(e);
     }
-          }
+  }
+
+  private static class FileData {
+    public long totalFileSize = 0;
+    public long maxFileSize = 0;
+    public long minFileSize = Long.MAX_VALUE;
+    public long lastAccessTime = 0;
+    public long lastUpdateTime = 0;
+    public int numOfFiles = 0;
+  }
 
+  // TODO: why is this in text formatter? grrr
   private void writeFileSystemStats(DataOutputStream outStream,
       HiveConf conf,
       List<Path> locations,
-      Path tblPath, boolean partSpecified, int indent)
-          throws IOException
-          {
-    long totalFileSize = 0;
-    long maxFileSize = 0;
-    long minFileSize = Long.MAX_VALUE;
-    long lastAccessTime = 0;
-    long lastUpdateTime = 0;
-    int numOfFiles = 0;
-
+      Path tblPath, boolean partSpecified, int indent) throws IOException {
+    FileData fd = new FileData();
     boolean unknown = false;
     FileSystem fs = tblPath.getFileSystem(conf);
     // in case all files in locations do not exist
     try {
       FileStatus tmpStatus = fs.getFileStatus(tblPath);
-      lastAccessTime = tmpStatus.getAccessTime();
-      lastUpdateTime = tmpStatus.getModificationTime();
+      fd.lastAccessTime = tmpStatus.getAccessTime();
+      fd.lastUpdateTime = tmpStatus.getModificationTime();
       if (partSpecified) {
         // check whether the part exists or not in fs
         tmpStatus = fs.getFileStatus(locations.get(0));
@@ -316,42 +319,12 @@ class TextMetaDataFormatter implements MetaDataFormatter {
       for (Path loc : locations) {
         try {
           FileStatus status = fs.getFileStatus(tblPath);
-          FileStatus[] files = fs.listStatus(loc);
-          long accessTime = status.getAccessTime();
-          long updateTime = status.getModificationTime();
           // no matter loc is the table location or part location, it must be a
           // directory.
           if (!status.isDir()) {
             continue;
           }
-          if (accessTime > lastAccessTime) {
-            lastAccessTime = accessTime;
-          }
-          if (updateTime > lastUpdateTime) {
-            lastUpdateTime = updateTime;
-          }
-          for (FileStatus currentStatus : files) {
-            if (currentStatus.isDir()) {
-              continue;
-            }
-            numOfFiles++;
-            long fileLen = currentStatus.getLen();
-            totalFileSize += fileLen;
-            if (fileLen > maxFileSize) {
-              maxFileSize = fileLen;
-            }
-            if (fileLen < minFileSize) {
-              minFileSize = fileLen;
-            }
-            accessTime = currentStatus.getAccessTime();
-            updateTime = currentStatus.getModificationTime();
-            if (accessTime > lastAccessTime) {
-              lastAccessTime = accessTime;
-            }
-            if (updateTime > lastUpdateTime) {
-              lastUpdateTime = updateTime;
-            }
-          }
+          processDir(status, fs, fd);
         } catch (IOException e) {
           // ignore
         }
@@ -363,29 +336,29 @@ class TextMetaDataFormatter implements MetaDataFormatter {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("totalNumberFiles:".getBytes("UTF-8"));
-    outStream.write((unknown ? unknownString : "" + numOfFiles).getBytes("UTF-8"));
+    outStream.write((unknown ? unknownString : "" + fd.numOfFiles).getBytes("UTF-8"));
     outStream.write(terminator);
 
     for (int k = 0; k < indent; k++) {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("totalFileSize:".getBytes("UTF-8"));
-    outStream.write((unknown ? unknownString : "" + totalFileSize).getBytes("UTF-8"));
+    outStream.write((unknown ? unknownString : "" + fd.totalFileSize).getBytes("UTF-8"));
     outStream.write(terminator);
 
     for (int k = 0; k < indent; k++) {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("maxFileSize:".getBytes("UTF-8"));
-    outStream.write((unknown ? unknownString : "" + maxFileSize).getBytes("UTF-8"));
+    outStream.write((unknown ? unknownString : "" + fd.maxFileSize).getBytes("UTF-8"));
     outStream.write(terminator);
 
     for (int k = 0; k < indent; k++) {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("minFileSize:".getBytes("UTF-8"));
-    if (numOfFiles > 0) {
-      outStream.write((unknown ? unknownString : "" + minFileSize).getBytes("UTF-8"));
+    if (fd.numOfFiles > 0) {
+      outStream.write((unknown ? unknownString : "" + fd.minFileSize).getBytes("UTF-8"));
     } else {
       outStream.write((unknown ? unknownString : "" + 0).getBytes("UTF-8"));
     }
@@ -395,17 +368,52 @@ class TextMetaDataFormatter implements MetaDataFormatter {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("lastAccessTime:".getBytes("UTF-8"));
-    outStream.writeBytes((unknown || lastAccessTime < 0) ? unknownString : ""
-        + lastAccessTime);
+    outStream.writeBytes((unknown || fd.lastAccessTime < 0) ? unknownString : ""
+        + fd.lastAccessTime);
     outStream.write(terminator);
 
     for (int k = 0; k < indent; k++) {
       outStream.write(Utilities.INDENT.getBytes("UTF-8"));
     }
     outStream.write("lastUpdateTime:".getBytes("UTF-8"));
-    outStream.write((unknown ? unknownString : "" + lastUpdateTime).getBytes("UTF-8"));
+    outStream.write((unknown ? unknownString : "" + fd.lastUpdateTime).getBytes("UTF-8"));
     outStream.write(terminator);
-          }
+  }
+
+  private void processDir(FileStatus status, FileSystem fs, FileData fd) throws IOException {
+    long accessTime = status.getAccessTime();
+    long updateTime = status.getModificationTime();
+    if (accessTime > fd.lastAccessTime) {
+      fd.lastAccessTime = accessTime;
+    }
+    if (updateTime > fd.lastUpdateTime) {
+      fd.lastUpdateTime = updateTime;
+    }
+    FileStatus[] files = fs.listStatus(status.getPath());
+    for (FileStatus currentStatus : files) {
+      if (currentStatus.isDir()) {
+        processDir(currentStatus, fs, fd);
+        continue;
+      }
+      fd.numOfFiles++;
+      long fileLen = currentStatus.getLen();
+      fd.totalFileSize += fileLen;
+      if (fileLen > fd.maxFileSize) {
+        fd.maxFileSize = fileLen;
+      }
+      if (fileLen < fd.minFileSize) {
+        fd.minFileSize = fileLen;
+      }
+      accessTime = currentStatus.getAccessTime();
+      updateTime = currentStatus.getModificationTime();
+      if (accessTime > fd.lastAccessTime) {
+        fd.lastAccessTime = accessTime;
+      }
+      if (updateTime > fd.lastUpdateTime) {
+        fd.lastUpdateTime = updateTime;
+      }
+    }
+  }
 
   /**
    * Show the table partitions.

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index 9297a0b..78d1e54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -195,7 +196,8 @@ public class GenMRTableScan1 implements NodeProcessor {
     aggregationKey = aggregationKeyBuffer.toString();
 
     // scan work
-    PartialScanWork scanWork = new PartialScanWork(inputPaths);
+    PartialScanWork scanWork = new PartialScanWork(inputPaths,
+        Utilities.getTableDesc(op.getConf().getTableMetadata()));
     scanWork.setMapperCannotSpanPartns(true);
     scanWork.setAggKey(aggregationKey);
     scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir(), parseCtx.getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index c3228d3..5107a89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1598,7 +1598,7 @@ public final class GenMapRedUtils {
     Utilities.LOG14535.info("creating mergefilework from " + inputDirs + " to " + finalName);
     // create the merge file work
     MergeFileWork work = new MergeFileWork(inputDirs, finalName,
-        hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
+        hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName(), tblDesc);
     LinkedHashMap<Path, ArrayList<String>> pathToAliases = new LinkedHashMap<>();
     pathToAliases.put(inputDir, inputDirstr);
     work.setMapperCannotSpanPartns(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index ede4fcb..b1f4577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -255,7 +255,7 @@ public final class GenMRSkewJoinProcessor {
         Operator<? extends OperatorDesc> ts =
             GenMapRedUtils.createTemporaryTableScanOperator(
                 joinOp.getCompilationOpContext(), rowSchemaList.get((byte)k));
-        ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
+        ((TableScanOperator)ts).setTableDescSkewJoin(tableDescList.get((byte)k));
         parentOps[k] = ts;
       }
       Operator<? extends OperatorDesc> tblScan_op = parentOps[i];

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index 405c3ca..38bb847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -231,7 +231,7 @@ public class GenSparkSkewJoinProcessor {
       for (int k = 0; k < tags.length; k++) {
         Operator<? extends OperatorDesc> ts = GenMapRedUtils.createTemporaryTableScanOperator(
             joinOp.getCompilationOpContext(), rowSchemaList.get((byte) k));
-        ((TableScanOperator) ts).setTableDesc(tableDescList.get((byte) k));
+        ((TableScanOperator) ts).setTableDescSkewJoin(tableDescList.get((byte) k));
         parentOps[k] = ts;
       }
 
@@ -362,7 +362,7 @@ public class GenSparkSkewJoinProcessor {
     HashTableDummyDesc desc = new HashTableDummyDesc();
     HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(
         tableScan.getCompilationOpContext(), desc);
-    dummyOp.getConf().setTbl(tableScan.getTableDesc());
+    dummyOp.getConf().setTbl(tableScan.getTableDescSkewJoin());
     MapJoinOperator mapJoinOp = (MapJoinOperator) tableScan.getChildOperators().get(0);
     mapJoinOp.replaceParent(tableScan, dummyOp);
     List<Operator<? extends OperatorDesc>> mapJoinChildren =

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
index 9ca815c..af3175e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
@@ -209,7 +209,7 @@ public final class LocalMapJoinProcFactory {
 
         if (parent.getSchema() == null) {
           if (parent instanceof TableScanOperator) {
-            tbl = ((TableScanOperator) parent).getTableDesc();
+            tbl = ((TableScanOperator) parent).getTableDescSkewJoin();
           } else {
             throw new SemanticException("Expected parent operator of type TableScanOperator." +
               "Found " + parent.getClass().getName() + " instead.");

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
index 81e99fc..e036cd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
@@ -118,7 +118,7 @@ public class IndexWhereProcessor implements NodeProcessor {
     // get potential reentrant index queries from each index
     Map<Index, HiveIndexQueryContext> queryContexts = new HashMap<Index, HiveIndexQueryContext>();
     // make sure we have an index on the table being scanned
-    TableDesc tblDesc = operator.getTableDesc();
+    TableDesc tblDesc = operator.getTableDescSkewJoin();
 
     Map<String, List<Index>> indexesByType = new HashMap<String, List<Index>>();
     for (Index indexOnTable : indexes) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
index bdb872a..7670b86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -39,6 +40,7 @@ public class AlterTablePartMergeFilesDesc {
   private List<Path> inputDir = new ArrayList<Path>();
   private Path outputDir = null;
   private Class<? extends InputFormat> inputFormatClass;
+  private TableDesc tableDesc;
 
   public AlterTablePartMergeFilesDesc(String tableName,
       HashMap<String, String> partSpec) {
@@ -102,4 +104,11 @@ public class AlterTablePartMergeFilesDesc {
     this.inputFormatClass = inputFormatClass;
   }
 
+  public void setTableDesc(TableDesc tableDesc) {
+    this.tableDesc = tableDesc;
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index e6a31e8..150db52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -1603,6 +1603,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (MetaStoreUtils.isInsertOnlyTable(tblObj.getParameters())) {
         throw new SemanticException("Merge is not supported for MM tables");
       }
+      mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj));
 
       List<String> bucketCols = null;
       Class<? extends InputFormat> inputFormatClass = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
index 653b657..d3b4da1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.ql.Driver;
@@ -43,6 +44,7 @@ import java.util.Set;
 public class IndexUpdater {
   private List<LoadTableDesc> loadTableWork;
   private HiveConf conf;
+  private Configuration parentConf;
   // Assumes one instance of this + single-threaded compilation for each query.
   private Hive hive;
   private List<Task<? extends Serializable>> tasks;
@@ -52,6 +54,7 @@ public class IndexUpdater {
   public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) {
     this.loadTableWork = loadTableWork;
     this.inputs = inputs;
+    this.parentConf = conf;
     this.conf = new HiveConf(conf, IndexUpdater.class);
     this.tasks = new LinkedList<Task<? extends Serializable>>();
   }
@@ -60,6 +63,7 @@ public class IndexUpdater {
       Configuration conf) {
     this.loadTableWork = new LinkedList<LoadTableDesc>();
     this.loadTableWork.add(loadTableWork);
+    this.parentConf = conf;
     this.conf = new HiveConf(conf, IndexUpdater.class);
     this.tasks = new LinkedList<Task<? extends Serializable>>();
     this.inputs = inputs;
@@ -75,16 +79,15 @@ public class IndexUpdater {
       Map<String, String> partSpec = ltd.getPartitionSpec();
       if (partSpec == null || partSpec.size() == 0) {
         //unpartitioned table, update whole index
-        doIndexUpdate(tblIndexes);
+        doIndexUpdate(tblIndexes, ltd.getMmWriteId());
       } else {
-        doIndexUpdate(tblIndexes, partSpec);
+        doIndexUpdate(tblIndexes, partSpec, ltd.getMmWriteId());
       }
     }
     return tasks;
   }
 
-  private void doIndexUpdate(List<Index> tblIndexes) throws HiveException {
-    Driver driver = new Driver(this.conf);
+  private void doIndexUpdate(List<Index> tblIndexes, Long mmWriteId) throws HiveException {
     for (Index idx : tblIndexes) {
       StringBuilder sb = new StringBuilder();
       sb.append("ALTER INDEX ");
@@ -93,23 +96,21 @@ public class IndexUpdater {
       sb.append(idx.getDbName()).append('.');
       sb.append(idx.getOrigTableName());
       sb.append(" REBUILD");
-      driver.compile(sb.toString(), false);
-      tasks.addAll(driver.getPlan().getRootTasks());
-      inputs.addAll(driver.getPlan().getInputs());
+      compileRebuild(sb.toString(), idx, mmWriteId);
     }
   }
 
   private void doIndexUpdate(List<Index> tblIndexes, Map<String, String>
-      partSpec) throws HiveException {
+      partSpec, Long mmWriteId) throws HiveException {
     for (Index index : tblIndexes) {
       if (containsPartition(index, partSpec)) {
-        doIndexUpdate(index, partSpec);
+        doIndexUpdate(index, partSpec, mmWriteId);
       }
     }
   }
 
-  private void doIndexUpdate(Index index, Map<String, String> partSpec) throws
-    HiveException {
+  private void doIndexUpdate(Index index, Map<String, String> partSpec, Long mmWriteId)
+      throws HiveException {
     StringBuilder ps = new StringBuilder();
     boolean first = true;
     ps.append("(");
@@ -133,14 +134,25 @@ public class IndexUpdater {
     sb.append(" PARTITION ");
     sb.append(ps.toString());
     sb.append(" REBUILD");
+    compileRebuild(sb.toString(), index, mmWriteId);
+  }
+
+  private void compileRebuild(String query, Index index, Long mmWriteId)
+      throws HiveException {
     Driver driver = new Driver(this.conf);
-    driver.compile(sb.toString(), false);
+    driver.compile(query, false);
+    if (mmWriteId != null) {
+      // TODO: this is rather fragile
+      ValidWriteIds.addCurrentToConf(
+          parentConf, index.getDbName(), index.getOrigTableName(), mmWriteId);
+    }
     tasks.addAll(driver.getPlan().getRootTasks());
     inputs.addAll(driver.getPlan().getInputs());
   }
 
-  private boolean containsPartition(Index index, Map<String, String> partSpec)
-      throws HiveException {
+
+  private boolean containsPartition(Index index,
+      Map<String, String> partSpec) throws HiveException {
     String[] qualified = Utilities.getDbTableName(index.getDbName(), index.getIndexTableName());
     Table indexTable = hive.getTable(qualified[0], qualified[1]);
     List<Partition> parts = hive.getPartitions(indexTable, partSpec);

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
index c13a404..41f471d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.lib.Node;
@@ -179,7 +180,8 @@ public class ProcessAnalyzeTable implements NodeProcessor {
     aggregationKey = aggregationKeyBuffer.toString();
 
     // scan work
-    PartialScanWork scanWork = new PartialScanWork(inputPaths);
+    PartialScanWork scanWork = new PartialScanWork(inputPaths,
+        Utilities.getTableDesc(tableScan.getConf().getTableMetadata()));
     scanWork.setMapperCannotSpanPartns(true);
     scanWork.setAggKey(aggregationKey);
     scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dbeb8c6..0a196c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6698,6 +6698,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       List<FieldSchema> field_schemas = null;
       CreateTableDesc tblDesc = qb.getTableDesc();
       CreateViewDesc viewDesc = qb.getViewDesc();
+      boolean isCtas = false;
       if (tblDesc != null) {
         field_schemas = new ArrayList<FieldSchema>();
         destTableIsTemporary = tblDesc.isTemporary();
@@ -6838,7 +6839,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part,
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, mmWriteId);
+        canBeMerged, mmWriteId, isMmCtas);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
       tableDesc.setWriter(fileSinkDesc);
@@ -6943,20 +6944,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       boolean destTableIsAcid, boolean destTableIsTemporary,
       boolean destTableIsMaterialization, Path queryTmpdir,
       SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
-      RowSchema fsRS, boolean canBeMerged, Long mmWriteId) throws SemanticException {
-    FileSinkDesc fileSinkDesc = new FileSinkDesc(
-      queryTmpdir,
-      table_desc,
-      conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
-      currentTableId,
-      rsCtx.isMultiFileSpray(),
-      canBeMerged,
-      rsCtx.getNumFiles(),
-      rsCtx.getTotalFiles(),
-      rsCtx.getPartnCols(),
-      dpCtx,
-      dest_path,
-      mmWriteId);
+      RowSchema fsRS, boolean canBeMerged, Long mmWriteId, boolean isMmCtas) throws SemanticException {
+    FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
+        conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
+        canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
+        dest_path, mmWriteId, isMmCtas);
 
     fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
     // If this is an insert, update, or delete on an ACID table then mark that so the

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
index 52186b4..b48735a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
 import org.apache.hadoop.hive.ql.lib.Node;
@@ -174,7 +175,8 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
     aggregationKey = aggregationKeyBuffer.toString();
 
     // scan work
-    PartialScanWork scanWork = new PartialScanWork(inputPaths);
+    PartialScanWork scanWork = new PartialScanWork(inputPaths,
+        Utilities.getTableDesc(tableScan.getConf().getTableMetadata()));
     scanWork.setMapperCannotSpanPartns(true);
     scanWork.setAggKey(aggregationKey);
     scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 1f84531..504a6ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -98,6 +98,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   private boolean isHiveServerQuery;
   private Long mmWriteId;
   private boolean isMerge;
+  private boolean isMmCtas;
 
   public FileSinkDesc() {
   }
@@ -109,7 +110,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
       final boolean canBeMerged, final int numFiles, final int totalFiles,
       final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
-      Long mmWriteId) {
+      Long mmWriteId, boolean isMmCtas) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -124,6 +125,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.dpSortState = DPSortState.NONE;
     this.destPath = destPath;
     this.mmWriteId = mmWriteId;
+    this.isMmCtas = isMmCtas;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -145,7 +147,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public Object clone() throws CloneNotSupportedException {
     FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
         destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx, destPath, mmWriteId);
+        partitionCols, dpCtx, destPath, mmWriteId, isMmCtas);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -500,4 +502,8 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public boolean isMerge() {
     return isMerge;
   }
+
+  public boolean isMmCtas() {
+    return isMmCtas;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 921461f..ee112d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -79,6 +79,8 @@ public class PartitionDesc implements Serializable, Cloneable {
   public PartitionDesc() {
   }
 
+  private final static org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(PartitionDesc.class);
+
   public PartitionDesc(final TableDesc table, final LinkedHashMap<String, String> partSpec) {
     this.tableDesc = table;
     this.partSpec = partSpec;
@@ -325,4 +327,13 @@ public class PartitionDesc implements Serializable, Cloneable {
   public VectorPartitionDesc getVectorPartitionDesc() {
     return vectorPartitionDesc;
   }
+
+  @Override
+  public String toString() {
+    return "PartitionDesc [tableDesc=" + tableDesc + ", partSpec=" + partSpec
+        + ", inputFileFormatClass=" + inputFileFormatClass
+        + ", outputFileFormatClass=" + outputFileFormatClass + ", properties="
+        + properties + ", baseFileName=" + baseFileName
+        + ", vectorPartitionDesc=" + vectorPartitionDesc + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index 0a611f9..977b39f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -214,4 +214,11 @@ public class TableDesc implements Serializable, Cloneable {
       jobProperties.equals(target.jobProperties));
     return ret;
   }
+
+  @Override
+  public String toString() {
+    return "TableDesc [inputFileFormatClass=" + inputFileFormatClass
+        + ", outputFileFormatClass=" + outputFileFormatClass + ", properties="
+        + properties + ", jobProperties=" + jobProperties + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1155ed75/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 909114c..4938e2f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -285,7 +285,8 @@ public class TestFileSinkOperator {
       partColMap.put(PARTCOL_NAME, null);
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
-      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null);
+      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
+          false, 1, 1, partCols, dpCtx, null, null, false);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }