You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/03 23:45:02 UTC

[5/5] hive git commit: HIVE-14641 : handle writing to dynamic partitions (Sergey Shelukhin)

HIVE-14641 : handle writing to dynamic partitions (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ad3df23b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ad3df23b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ad3df23b

Branch: refs/heads/hive-14535
Commit: ad3df23b9e9ecf0ecbee11b1a143658364b45e16
Parents: e02691b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Oct 3 16:43:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Oct 3 16:43:45 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 128 ++---
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   3 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   9 +-
 ql/src/test/queries/clientpositive/mm_all.q     | 162 ++++---
 ql/src/test/queries/clientpositive/mm_current.q |  37 +-
 .../results/clientpositive/llap/mm_all.q.out    | 467 +++++++++++++++++++
 .../clientpositive/llap/mm_current.q.out        | 235 +++-------
 7 files changed, 749 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 6ea1a98..f11a7c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -225,11 +225,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
     }
 
-    private void commit(FileSystem fs) throws HiveException {
-      List<Path> commitPaths = null;
-      if (isMmTable) {
-        commitPaths = new ArrayList<>();
-      }
+    private void commit(FileSystem fs, List<Path> commitPaths) throws HiveException {
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
           commitOneOutPath(idx, fs, commitPaths);
@@ -238,21 +234,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
               outPaths[idx] + " to: " + finalPaths[idx], e);
         }
       }
-      if (isMmTable) {
-        Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(
-            conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION);
-        Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
-        try {
-          try (FSDataOutputStream out = fs.create(manifestPath)) {
-            out.writeInt(commitPaths.size());
-            for (Path path : commitPaths) {
-              out.writeUTF(path.toString());
-            }
-          }
-        } catch (IOException e) {
-          throw new HiveException(e);
-        }
-      }
     }
 
     private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths)
@@ -328,8 +309,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
             finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension);
           } else {
-            // TODO# wrong! special case #N bucketing
-            finalPaths[filesIdx] = getFinalPath(subdirPath, specPath, extension);
+            // TODO# does this need extra special handing for bucketing?
+            // Note: tmpPath here has the correct partition key
+            finalPaths[filesIdx] = getFinalPath(subdirPath, tmpPath, extension);
           }
           outPaths[filesIdx] = finalPaths[filesIdx];
         }
@@ -921,7 +903,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); // TODO# this will break
     fsp2.configureDynPartPath(dirName, childSpecPathDynLinkedPartitions);
     Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec " + childSpecPathDynLinkedPartitions
-        + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath());
+        + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath()/*, new Exception()*/);
     if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
       createBucketFiles(fsp2);
       valToPaths.put(dirName, fsp2);
@@ -1104,6 +1086,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
             throw new HiveException(e);
           }
       }
+      List<Path> commitPaths = new ArrayList<>();
       for (FSPaths fsp : valToPaths.values()) {
         fsp.closeWriters(abort);
         // before closing the operator check if statistics gathering is requested
@@ -1139,7 +1122,27 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
 
         if (isNativeTable) {
-          fsp.commit(fs);
+          fsp.commit(fs, commitPaths);
+        }
+      }
+      if (!commitPaths.isEmpty()) {
+        Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(
+            conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION);
+        Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
+        try {
+          // Don't overwrite the manifest... should fail if we have collisions.
+          // We assume one FSOP per task (per specPath), so we create it in specPath.
+          try (FSDataOutputStream out = fs.create(manifestPath, false)) {
+            if (out == null) {
+              throw new HiveException("Failed to create manifest at " + manifestPath);
+            }
+            out.writeInt(commitPaths.size());
+            for (Path path : commitPaths) {
+              out.writeUTF(path.toString());
+            }
+          }
+        } catch (IOException e) {
+          throw new HiveException(e);
         }
       }
       // Only publish stats if this operator's flag was set to gather stats
@@ -1197,30 +1200,27 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
           throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
-    int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
+    // Manifests would be at the root level, but the results at target level.
+    // TODO# special case - doesn't take bucketing into account
+    int targetLevel = (dpCtx == null) ? 1 : (dpCtx.getNumDPCols() + 1);
+    int manifestLevel = 1;
+    ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true);
     if (!success) {
-      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs,
-          new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true));
-      for (FileStatus status : statuses) {
-        Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
-        tryDelete(fs, status.getPath());
-      }
+      deleteMatchingFiles(specPath, fs, targetLevel, filter);
+      deleteMatchingFiles(specPath, fs, manifestLevel, filter);
       return;
     }
-    FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs,
-        new ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true));
-    if (statuses == null) return;
-    LinkedList<FileStatus> results = new LinkedList<>();
-    List<Path> manifests = new ArrayList<>(statuses.length);
-    for (FileStatus status : statuses) {
-      if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
-        manifests.add(status.getPath());
-      } else if (!status.isDirectory()) {
-        Path path = status.getPath();
-        Utilities.LOG14535.warn("Unknown file found - neither a manifest nor directory: " + path);
-        tryDelete(fs, path);
-      } else {
-        results.addAll(Lists.newArrayList(fs.listStatus(status.getPath())));
+    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(specPath, manifestLevel, fs, filter);
+    List<Path> manifests = new ArrayList<>(files.length);
+    if (files != null) {
+      for (FileStatus status : files) {
+        if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
+          manifests.add(status.getPath());
+        } else if (!status.isDirectory()) {
+          Path path = status.getPath();
+          Utilities.LOG14535.warn("Unknown file found - neither a manifest nor directory: " + path);
+          tryDelete(fs, path);
+        }
       }
     }
     HashSet<String> committed = new HashSet<>();
@@ -1235,18 +1235,27 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
       }
     }
-    Iterator<FileStatus> iter = results.iterator();
-    while (iter.hasNext()) {
-      FileStatus rfs = iter.next();
-      if (!committed.remove(rfs.getPath().toString())) {
-        iter.remove();
-        Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not committed");
-        // We should actually succeed here - if we fail, don't commit the query.
-        if (!fs.delete(rfs.getPath(), true)) {
-          throw new HiveException("Failed to delete an uncommitted path " + rfs.getPath());
+
+    files = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs, filter);
+    LinkedList<FileStatus> results = new LinkedList<>();
+    for (FileStatus status : files) {
+      if (!status.isDirectory()) {
+        Path path = status.getPath();
+        Utilities.LOG14535.warn("Unknown file found - neither a manifest nor directory: " + path);
+        tryDelete(fs, path);
+      } else {
+        for (FileStatus child : fs.listStatus(status.getPath())) {
+          Path path = child.getPath();
+          if (committed.remove(path.toString())) continue; // A good file.
+          Utilities.LOG14535.info("Deleting " + path + " that was not committed");
+          // We should actually succeed here - if we fail, don't commit the query.
+          if (!fs.delete(path, true)) {
+            throw new HiveException("Failed to delete an uncommitted path " + path);
+          }
         }
       }
     }
+
     if (!committed.isEmpty()) {
       throw new HiveException("The following files were committed but not found: " + committed);
     }
@@ -1258,7 +1267,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     if (results.isEmpty()) return;
     FileStatus[] finalResults = results.toArray(new FileStatus[results.size()]);
 
-    // TODO# dp will break - removeTempOrDuplicateFiles assumes dirs in results. Why? We recurse...
+    // TODO# dp may break - removeTempOrDuplicateFiles assumes dirs in results. Why? We recurse...
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
         fs, finalResults, dpCtx, conf, hconf);
     // create empty buckets if necessary
@@ -1267,6 +1276,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
   }
 
+  private void deleteMatchingFiles(Path specPath, FileSystem fs,
+      int targetLevel, ValidWriteIds.IdPathFilter filter) throws IOException {
+    for (FileStatus status : HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs,
+        filter)) {
+      Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
+      tryDelete(fs, status.getPath());
+    }
+  }
+
   private void tryDelete(FileSystem fs, Path path) {
     try {
       fs.delete(path, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 3be21c4..538bf79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -435,7 +435,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         isSkewedStoredAsDirs(tbd),
         work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
         SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
-        work.getLoadTableWork().getWriteType());
+        work.getLoadTableWork().getWriteType(),
+        tbd.getMmWriteId());
 
     console.printInfo("\t Time taken to load dynamic partitions: "  +
         (System.currentTimeMillis() - startTime)/1000.0 + " seconds");

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 68d59aa..38b434d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1850,7 +1850,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path loadPath,
       final String tableName, final Map<String, String> partSpec, final boolean replace,
       final int numDP, final boolean listBucketingEnabled, final boolean isAcid, final long txnId,
-      final boolean hasFollowingStatsTask, final AcidUtils.Operation operation)
+      final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, final Long mmWriteId)
       throws HiveException {
 
     final Map<Map<String, String>, Partition> partitionsMap =
@@ -1895,7 +1895,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
                   replace, true, listBucketingEnabled,
-                  false, isAcid, hasFollowingStatsTask, null); // TODO# special case #N
+                  false, isAcid, hasFollowingStatsTask, mmWriteId);
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -1927,7 +1927,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (Future future : futures) {
         future.get();
       }
-      // TODO# special case #N - DP - we would commit the txn to metastore here
+      if (mmWriteId != null) {
+        // Commit after we have processed all the partitions.
+        commitMmTableWrite(tbl, mmWriteId);
+      }
     } catch (InterruptedException | ExecutionException e) {
       LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
       //cancel other futures

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q
index aaf8d48..59171af 100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -1,63 +1,117 @@
 set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
-set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.fetch.task.conversion=none;
 
-drop table simple_mm;
-drop table partunion_mm;
-drop table merge_mm;
-drop table ctas_mm;
-drop table T1;
-drop table T2;
-drop table skew_mm;
-
-
-create table simple_mm(key int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
-insert into table simple_mm partition(key_mm='455') select key from src limit 3;
-
-create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3;
-
-create table partunion_mm(id_mm int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
-
-
-insert into table partunion_mm partition(key_mm)
-select temps.* from (
-select key as key_mm, key from ctas_mm 
-union all 
-select key as key_mm, key from simple_mm ) temps;
+-- Force multiple writers when reading
+drop table intermediate;
+create table intermediate(key int) partitioned by (p int) stored as orc;
+insert into table intermediate partition(p='455') select key from src limit 2;
+insert into table intermediate partition(p='456') select key from src limit 2;
+
+drop table part_mm;
+create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
+explain insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+insert into table part_mm partition(key_mm='456') select key from intermediate;
+insert into table part_mm partition(key_mm='455') select key from intermediate;
+select * from part_mm;
+drop table part_mm;
 
-set hive.merge.mapredfiles=true;
-set hive.merge.sparkfiles=true;
-set hive.merge.tezfiles=true;
-
-CREATE TABLE merge_mm (key INT, value STRING) 
-    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true');
-
-EXPLAIN
-INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
-        SELECT key, value, PMOD(HASH(key), 2) as part
-        FROM src;
-
-INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
-        SELECT key, value, PMOD(HASH(key), 2) as part
-        FROM src;
-
-
-set hive.optimize.skewjoin.compiletime = true;
--- the test case is wrong?
+drop table simple_mm;
+create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true');
+insert into table simple_mm select key from intermediate;
+insert overwrite table simple_mm select key from intermediate;
+select * from simple_mm;
+drop table simple_mm;
 
-CREATE TABLE T1(key STRING, val STRING)
-SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
-LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
-CREATE TABLE T2(key STRING, val STRING)
-SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
-LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
 
-EXPLAIN
-SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+-- simple DP (no bucketing, no sorting?)
+drop table dp_no_mm;
+drop table dp_mm;
 
-create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true');
-INSERT OVERWRITE TABLE skew_mm
-SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key;
+set hive.exec.dynamic.partition.mode=nonstrict;
 
--- TODO load, acid, etc
+set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
+set hive.merge.tezfiles=false;
+
+create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc;
+create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true');
+
+insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate;
+
+insert into table dp_mm partition (key1='123', key2) select key, key from intermediate;
+
+select * from dp_no_mm;
+select * from dp_mm;
+
+drop table dp_no_mm;
+drop table dp_mm;
+
+
+
+-- future
+
+
+
+
+
+--drop table partunion_mm;
+--drop table merge_mm;
+--drop table ctas_mm;
+--drop table T1;
+--drop table T2;
+--drop table skew_mm;
+--
+--
+--create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3;
+--
+--create table partunion_mm(id_mm int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
+--
+--
+--insert into table partunion_mm partition(key_mm)
+--select temps.* from (
+--select key as key_mm, key from ctas_mm 
+--union all 
+--select key as key_mm, key from simple_mm ) temps;
+--
+--set hive.merge.mapredfiles=true;
+--set hive.merge.sparkfiles=true;
+--set hive.merge.tezfiles=true;
+--
+--CREATE TABLE merge_mm (key INT, value STRING) 
+--    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true');
+--
+--EXPLAIN
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--
+--set hive.optimize.skewjoin.compiletime = true;
+---- the test case is wrong?
+--
+--CREATE TABLE T1(key STRING, val STRING)
+--SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+--CREATE TABLE T2(key STRING, val STRING)
+--SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
+--
+--EXPLAIN
+--SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+--create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true');
+--INSERT OVERWRITE TABLE skew_mm
+--SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+---- TODO load, acid, etc
+--
+--
+
+drop table intermediate;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
index 7c3e138..b551176 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -6,29 +6,34 @@ set tez.grouping.min-size=1;
 set tez.grouping.max-size=2;
 set hive.tez.auto.reducer.parallelism=false;
 
-drop table part_mm;
-drop table simple_mm;
 drop table intermediate;
-
 create table intermediate(key int) partitioned by (p int) stored as orc;
 insert into table intermediate partition(p='455') select key from src limit 2;
 insert into table intermediate partition(p='456') select key from src limit 2;
-  
-create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true');
 
-explain insert into table part_mm partition(key_mm='455') select key from intermediate;
-insert into table part_mm partition(key_mm='455') select key from intermediate;
-insert into table part_mm partition(key_mm='456') select key from intermediate;
-insert into table part_mm partition(key_mm='455') select key from intermediate;
-select * from part_mm;
 
-create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true');
-insert into table simple_mm select key from intermediate;
-insert overwrite table simple_mm select key from intermediate;
 
-select * from simple_mm;
+drop table dp_no_mm;
+drop table dp_mm;
+
+set hive.merge.mapredfiles=false;
+set hive.merge.sparkfiles=false;
+set hive.merge.tezfiles=false;
+
+create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc;
+create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true');
+
+insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate;
+
+insert into table dp_mm partition (key1='123', key2) select key, key from intermediate;
+
+select * from dp_no_mm;
+select * from dp_mm;
+
+drop table dp_no_mm;
+drop table dp_mm;
 
-drop table part_mm;
-drop table simple_mm;
 drop table intermediate;
 
+

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/test/results/clientpositive/llap/mm_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out
new file mode 100644
index 0000000..b0c9c0a
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -0,0 +1,467 @@
+PREHOOK: query: -- Force multiple writers when reading
+drop table intermediate
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- Force multiple writers when reading
+drop table intermediate
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: create table intermediate(key int) partitioned by (p int) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@intermediate
+PREHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=455
+POSTHOOK: query: insert into table intermediate partition(p='455') select key from src limit 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=455
+POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=456
+POSTHOOK: query: insert into table intermediate partition(p='456') select key from src limit 2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=456
+POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@part_mm
+PREHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+POSTHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: intermediate
+                  Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: int)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.part_mm
+            Execution mode: llap
+            LLAP IO: all inputs
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            key_mm 455
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.part_mm
+          micromanaged table: true
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=456
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@part_mm@key_mm=455
+POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from part_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@part_mm
+PREHOOK: Input: default@part_mm@key_mm=455
+PREHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+POSTHOOK: query: select * from part_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Input: default@part_mm@key_mm=455
+POSTHOOK: Input: default@part_mm@key_mm=456
+#### A masked pattern was here ####
+238	455
+86	455
+238	455
+86	455
+238	455
+86	455
+238	455
+86	455
+238	456
+86	456
+238	456
+86	456
+PREHOOK: query: drop table part_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part_mm
+PREHOOK: Output: default@part_mm
+POSTHOOK: query: drop table part_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part_mm
+POSTHOOK: Output: default@part_mm
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: insert into table simple_mm select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert overwrite table simple_mm select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert overwrite table simple_mm select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+238
+86
+238
+86
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@simple_mm
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@simple_mm
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: -- simple DP (no bucketing, no sorting?)
+drop table dp_no_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: -- simple DP (no bucketing, no sorting?)
+drop table dp_no_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table dp_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table dp_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dp_no_mm
+POSTHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dp_no_mm
+PREHOOK: query: create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@dp_mm
+POSTHOOK: query: create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@dp_mm
+PREHOOK: query: insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@dp_no_mm@key1=123
+POSTHOOK: query: insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=238
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=86
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=238).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=86).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table dp_mm partition (key1='123', key2) select key, key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@dp_mm@key1=123
+POSTHOOK: query: insert into table dp_mm partition (key1='123', key2) select key, key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@dp_mm@key1=123/key2=238
+POSTHOOK: Output: default@dp_mm@key1=123/key2=86
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=238).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=86).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from dp_no_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dp_no_mm
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=238
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=86
+#### A masked pattern was here ####
+POSTHOOK: query: select * from dp_no_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dp_no_mm
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=238
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=86
+#### A masked pattern was here ####
+238	123	238
+238	123	238
+86	123	86
+86	123	86
+PREHOOK: query: select * from dp_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dp_mm
+PREHOOK: Input: default@dp_mm@key1=123/key2=238
+PREHOOK: Input: default@dp_mm@key1=123/key2=86
+#### A masked pattern was here ####
+POSTHOOK: query: select * from dp_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dp_mm
+POSTHOOK: Input: default@dp_mm@key1=123/key2=238
+POSTHOOK: Input: default@dp_mm@key1=123/key2=86
+#### A masked pattern was here ####
+238	123	238
+238	123	238
+86	123	86
+86	123	86
+PREHOOK: query: drop table dp_no_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dp_no_mm
+PREHOOK: Output: default@dp_no_mm
+POSTHOOK: query: drop table dp_no_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dp_no_mm
+POSTHOOK: Output: default@dp_no_mm
+PREHOOK: query: drop table dp_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@dp_mm
+PREHOOK: Output: default@dp_mm
+POSTHOOK: query: drop table dp_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@dp_mm
+POSTHOOK: Output: default@dp_mm
+PREHOOK: query: -- future
+
+
+
+
+
+--drop table partunion_mm;
+--drop table merge_mm;
+--drop table ctas_mm;
+--drop table T1;
+--drop table T2;
+--drop table skew_mm;
+--
+--
+--create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3;
+--
+--create table partunion_mm(id_mm int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
+--
+--
+--insert into table partunion_mm partition(key_mm)
+--select temps.* from (
+--select key as key_mm, key from ctas_mm 
+--union all 
+--select key as key_mm, key from simple_mm ) temps;
+--
+--set hive.merge.mapredfiles=true;
+--set hive.merge.sparkfiles=true;
+--set hive.merge.tezfiles=true;
+--
+--CREATE TABLE merge_mm (key INT, value STRING) 
+--    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true');
+--
+--EXPLAIN
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--
+--set hive.optimize.skewjoin.compiletime = true;
+---- the test case is wrong?
+--
+--CREATE TABLE T1(key STRING, val STRING)
+--SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+--CREATE TABLE T2(key STRING, val STRING)
+--SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
+--
+--EXPLAIN
+--SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+--create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true');
+--INSERT OVERWRITE TABLE skew_mm
+--SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+---- TODO load, acid, etc
+--
+--
+
+drop table intermediate
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@intermediate
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: -- future
+
+
+
+
+
+--drop table partunion_mm;
+--drop table merge_mm;
+--drop table ctas_mm;
+--drop table T1;
+--drop table T2;
+--drop table skew_mm;
+--
+--
+--create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3;
+--
+--create table partunion_mm(id_mm int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
+--
+--
+--insert into table partunion_mm partition(key_mm)
+--select temps.* from (
+--select key as key_mm, key from ctas_mm 
+--union all 
+--select key as key_mm, key from simple_mm ) temps;
+--
+--set hive.merge.mapredfiles=true;
+--set hive.merge.sparkfiles=true;
+--set hive.merge.tezfiles=true;
+--
+--CREATE TABLE merge_mm (key INT, value STRING) 
+--    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true');
+--
+--EXPLAIN
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+--        SELECT key, value, PMOD(HASH(key), 2) as part
+--        FROM src;
+--
+--
+--set hive.optimize.skewjoin.compiletime = true;
+---- the test case is wrong?
+--
+--CREATE TABLE T1(key STRING, val STRING)
+--SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+--CREATE TABLE T2(key STRING, val STRING)
+--SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+--LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
+--
+--EXPLAIN
+--SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+--create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true');
+--INSERT OVERWRITE TABLE skew_mm
+--SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key;
+--
+---- TODO load, acid, etc
+--
+--
+
+drop table intermediate
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Output: default@intermediate

http://git-wip-us.apache.org/repos/asf/hive/blob/ad3df23b/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index ece6cbf..fe1caee 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,11 +1,3 @@
-PREHOOK: query: drop table part_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table part_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: drop table simple_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple_mm
-POSTHOOK: type: DROPTABLE
 PREHOOK: query: drop table intermediate
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table intermediate
@@ -36,193 +28,110 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@intermediate@p=456
 POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: query: drop table dp_no_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table dp_no_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table dp_mm
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table dp_mm
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@part_mm
-POSTHOOK: query: create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@dp_no_mm
+POSTHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored as orc
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@part_mm
-PREHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
-PREHOOK: type: QUERY
-POSTHOOK: query: explain insert into table part_mm partition(key_mm='455') select key from intermediate
-POSTHOOK: type: QUERY
-STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
-  Stage-0 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-0
-
-STAGE PLANS:
-  Stage: Stage-1
-    Tez
-#### A masked pattern was here ####
-      Vertices:
-        Map 1 
-            Map Operator Tree:
-                TableScan
-                  alias: intermediate
-                  Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: int)
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
-                    File Output Operator
-                      compressed: false
-                      Statistics: Num rows: 4 Data size: 48 Basic stats: COMPLETE Column stats: NONE
-                      table:
-                          input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
-                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                          name: default.part_mm
-            Execution mode: llap
-            LLAP IO: all inputs
-
-  Stage: Stage-2
-    Dependency Collection
-
-  Stage: Stage-0
-    Move Operator
-      tables:
-          partition:
-            key_mm 455
-          replace: false
-          table:
-              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
-              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
-              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-              name: default.part_mm
-          micromanaged table: true
-
-  Stage: Stage-3
-    Stats-Aggr Operator
-
-PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@part_mm@key_mm=455
-POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@part_mm@key_mm=455
-POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@part_mm@key_mm=456
-POSTHOOK: query: insert into table part_mm partition(key_mm='456') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@part_mm@key_mm=456
-POSTHOOK: Lineage: part_mm PARTITION(key_mm=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
-PREHOOK: type: QUERY
-PREHOOK: Input: default@intermediate
-PREHOOK: Input: default@intermediate@p=455
-PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@part_mm@key_mm=455
-POSTHOOK: query: insert into table part_mm partition(key_mm='455') select key from intermediate
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@intermediate
-POSTHOOK: Input: default@intermediate@p=455
-POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@part_mm@key_mm=455
-POSTHOOK: Lineage: part_mm PARTITION(key_mm=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from part_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@part_mm
-PREHOOK: Input: default@part_mm@key_mm=455
-PREHOOK: Input: default@part_mm@key_mm=456
-#### A masked pattern was here ####
-POSTHOOK: query: select * from part_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@part_mm
-POSTHOOK: Input: default@part_mm@key_mm=455
-POSTHOOK: Input: default@part_mm@key_mm=456
-#### A masked pattern was here ####
-0	455
-455	455
-0	455
-455	455
-0	455
-455	455
-0	455
-455	455
-0	456
-455	456
-0	456
-455	456
-PREHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: Output: default@dp_no_mm
+PREHOOK: query: create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true')
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: create table simple_mm(key int) stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@dp_mm
+POSTHOOK: query: create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc
+  tblproperties ('hivecommit'='true')
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple_mm
-PREHOOK: query: insert into table simple_mm select key from intermediate
+POSTHOOK: Output: default@dp_mm
+PREHOOK: query: insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
 PREHOOK: Input: default@intermediate@p=455
 PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: insert into table simple_mm select key from intermediate
+PREHOOK: Output: default@dp_no_mm@key1=123
+POSTHOOK: query: insert into table dp_no_mm partition (key1='123', key2) select key, key from intermediate
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@simple_mm
-POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert overwrite table simple_mm select key from intermediate
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=0
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=455
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=0).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: insert into table dp_mm partition (key1='123', key2) select key, key from intermediate
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
 PREHOOK: Input: default@intermediate@p=455
 PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: insert overwrite table simple_mm select key from intermediate
+PREHOOK: Output: default@dp_mm@key1=123
+POSTHOOK: query: insert into table dp_mm partition (key1='123', key2) select key, key from intermediate
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@simple_mm
-POSTHOOK: Lineage: simple_mm.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from simple_mm
+POSTHOOK: Output: default@dp_mm@key1=123/key2=0
+POSTHOOK: Output: default@dp_mm@key1=123/key2=455
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=0).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from dp_no_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dp_no_mm
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=0
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=455
+#### A masked pattern was here ####
+POSTHOOK: query: select * from dp_no_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dp_no_mm
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=0
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=455
+#### A masked pattern was here ####
+455	123	455
+455	123	455
+0	123	0
+0	123	0
+PREHOOK: query: select * from dp_mm
 PREHOOK: type: QUERY
-PREHOOK: Input: default@simple_mm
+PREHOOK: Input: default@dp_mm
+PREHOOK: Input: default@dp_mm@key1=123/key2=0
+PREHOOK: Input: default@dp_mm@key1=123/key2=455
 #### A masked pattern was here ####
-POSTHOOK: query: select * from simple_mm
+POSTHOOK: query: select * from dp_mm
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@simple_mm
+POSTHOOK: Input: default@dp_mm
+POSTHOOK: Input: default@dp_mm@key1=123/key2=0
+POSTHOOK: Input: default@dp_mm@key1=123/key2=455
 #### A masked pattern was here ####
-0
-455
-0
-455
-PREHOOK: query: drop table part_mm
+455	123	455
+455	123	455
+0	123	0
+0	123	0
+PREHOOK: query: drop table dp_no_mm
 PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@part_mm
-PREHOOK: Output: default@part_mm
-POSTHOOK: query: drop table part_mm
+PREHOOK: Input: default@dp_no_mm
+PREHOOK: Output: default@dp_no_mm
+POSTHOOK: query: drop table dp_no_mm
 POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@part_mm
-POSTHOOK: Output: default@part_mm
-PREHOOK: query: drop table simple_mm
+POSTHOOK: Input: default@dp_no_mm
+POSTHOOK: Output: default@dp_no_mm
+PREHOOK: query: drop table dp_mm
 PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@simple_mm
-PREHOOK: Output: default@simple_mm
-POSTHOOK: query: drop table simple_mm
+PREHOOK: Input: default@dp_mm
+PREHOOK: Output: default@dp_mm
+POSTHOOK: query: drop table dp_mm
 POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@simple_mm
-POSTHOOK: Output: default@simple_mm
+POSTHOOK: Input: default@dp_mm
+POSTHOOK: Output: default@dp_mm
 PREHOOK: query: drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate