You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/05/07 20:21:03 UTC

hive git commit: HIVE-17657 : export/import for MM tables is broken (Sergey Shelukhin, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master 0fd74db20 -> 7ebcdeb95


HIVE-17657 : export/import for MM tables is broken (Sergey Shelukhin, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7ebcdeb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7ebcdeb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7ebcdeb9

Branch: refs/heads/master
Commit: 7ebcdeb951fca86c1fb4920553ff9f2d3687627a
Parents: 0fd74db
Author: sergey <se...@apache.org>
Authored: Mon May 7 13:20:51 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Mon May 7 13:20:51 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/CopyTask.java    |  49 ++------
 .../apache/hadoop/hive/ql/exec/ExportTask.java  |  10 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |   3 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   6 +-
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |  45 ++++---
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  34 ++----
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |   2 +-
 .../ql/parse/repl/dump/PartitionExport.java     |   9 +-
 .../hive/ql/parse/repl/dump/TableExport.java    |  11 +-
 .../ql/parse/repl/dump/io/FileOperations.java   |  84 +++++++++++--
 .../apache/hadoop/hive/ql/plan/CopyWork.java    |  18 ---
 .../apache/hadoop/hive/ql/plan/ExportWork.java  |  60 +++++----
 .../apache/hadoop/hive/ql/TestTxnCommands.java  | 122 +++++++++++++++++--
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java  |   9 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |   3 +-
 ql/src/test/queries/clientpositive/mm_exim.q    |   4 +-
 .../results/clientpositive/llap/mm_exim.q.out   |  66 +++++++++-
 17 files changed, 382 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index b0ec5ab..1a8e5e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -63,14 +63,25 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
   protected int copyOnePath(Path fromPath, Path toPath) {
     FileSystem dstFs = null;
     try {
-      Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath);
+      Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath);
       console.printInfo("Copying data from " + fromPath.toString(), " to "
           + toPath.toString());
 
       FileSystem srcFs = fromPath.getFileSystem(conf);
       dstFs = toPath.getFileSystem(conf);
 
-      FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs());
+      FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter());
+
+      // TODO: this is very brittle given that Hive supports nested directories in the tables.
+      //       The caller should pass a flag explicitly telling us if the directories in the
+      //       input are data, or parent of data. For now, retain this for backward compat.
+      if (srcs != null && srcs.length == 1 && srcs[0].isDirectory()
+          /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) -  still broken for partitions*/) {
+        Utilities.FILE_OP_LOGGER.debug(
+            "Recursing into a single child directory {}", srcs[0].getPath().getName());
+        srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+      }
+
       if (srcs == null || srcs.length == 0) {
         if (work.isErrorOnSrcEmpty()) {
           console.printError("No files matching path: " + fromPath.toString());
@@ -107,40 +118,6 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
     }
   }
 
-  // Note: initially copied from LoadSemanticAnalyzer.
-  private static FileStatus[] matchFilesOrDir(
-      FileSystem fs, Path path, boolean isSourceMm) throws IOException {
-    if (!fs.exists(path)) return null;
-    if (!isSourceMm) return matchFilesOneDir(fs, path, null);
-    // Note: this doesn't handle list bucketing properly; neither does the original code.
-    FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter());
-    if (mmDirs == null || mmDirs.length == 0) return null;
-    List<FileStatus> allFiles = new ArrayList<FileStatus>();
-    for (FileStatus mmDir : mmDirs) {
-      if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-        Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath());
-      }
-      matchFilesOneDir(fs, mmDir.getPath(), allFiles);
-    }
-    return allFiles.toArray(new FileStatus[allFiles.size()]);
-  }
-
-  private static FileStatus[] matchFilesOneDir(
-      FileSystem fs, Path path, List<FileStatus> result) throws IOException {
-    FileStatus[] srcs = fs.globStatus(path, new EximPathFilter());
-    if (srcs != null && srcs.length == 1) {
-      if (srcs[0].isDirectory()) {
-        srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
-      }
-    }
-    if (result != null && srcs != null) {
-      for (int i = 0; i < srcs.length; ++i) {
-        result.add(srcs[i]);
-      }
-    }
-    return srcs;
-  }
-
   private static final class EximPathFilter implements PathFilter {
     @Override
     public boolean accept(Path p) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index aba6591..119d792 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -47,15 +47,13 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
   protected int execute(DriverContext driverContext) {
     try {
       // Also creates the root directory
-      TableExport.Paths exportPaths =
-          new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(),
-              conf, false);
+      TableExport.Paths exportPaths = new TableExport.Paths(
+          work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false);
       Hive db = getHive();
       LOG.debug("Exporting data to: {}", exportPaths.getExportRootDir());
       work.acidPostProcess(db);
-      TableExport tableExport = new TableExport(
-          exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf
-      );
+      TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(),
+          work.getReplicationSpec(), db, null, conf, work.getMmContext());
       if (!tableExport.write()) {
         throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg());
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 31846a3..406bea0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import java.beans.DefaultPersistenceDelegate;
 import java.beans.Encoder;
 import java.beans.Expression;
@@ -72,6 +73,7 @@ import java.util.regex.Pattern;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.commons.codec.binary.Base64;
@@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 88d352b..ccdf04a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -288,7 +289,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
         tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
       }
-      new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write();
+      MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
+      new TableExport(
+          exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write();
+
 
       replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
     } catch (InvalidTableException te) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index d3c62a2..4a366a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -19,12 +19,17 @@
 package org.apache.hadoop.hive.ql.parse;
 
 
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
 import org.apache.hadoop.hive.ql.plan.ExportWork;
-
-import javax.annotation.Nullable;
-import java.util.Set;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 
 /**
  * ExportSemanticAnalyzer.
  *
  */
 public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
+  private boolean isMmExport = false;
 
   ExportSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
@@ -48,7 +52,9 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   @Override
   public void analyzeInternal(ASTNode ast) throws SemanticException {
-    rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs));
+    Task<ExportWork> task = analyzeExport(ast, null, db, conf, inputs, outputs);
+    isMmExport = task.getWork().getMmContext() != null;
+    rootTasks.add(task);
   }
   /**
    * @param acidTableName - table name in db.table format; not NULL if exporting Acid table
@@ -80,12 +86,10 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     try {
       ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true);
-    } catch (SemanticException sme){
-      if ((replicationSpec.isInReplicationScope()) &&
-            ((sme.getCause() instanceof InvalidTableException)
-            || (sme instanceof Table.ValidationFailureSemanticException)
-            )
-          ){
+    } catch (SemanticException sme) {
+      if (!replicationSpec.isInReplicationScope()) throw sme;
+      if ((sme.getCause() instanceof InvalidTableException)
+            || (sme instanceof Table.ValidationFailureSemanticException)) {
         // If we're in replication scope, it's possible that we're running the export long after
         // the table was dropped, so the table not existing currently or being a different kind of
         // table is not an error - it simply means we should no-op, and let a future export
@@ -101,15 +105,26 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
     // All parsing is done, we're now good to start the export process
     TableExport.Paths exportPaths =
         new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false);
-    TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf);
-    TableExport.AuthEntities authEntities = tableExport.getAuthEntities();
+    // Note: this tableExport is actually never used other than for auth, and another one is
+    //       created when the task is executed. So, we don't care about the correct MM state here.
+    TableExport.AuthEntities authEntities = new TableExport(
+        exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities();
     inputs.addAll(authEntities.inputs);
     outputs.addAll(authEntities.outputs);
     String exportRootDirName = tmpPath;
+    MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle);
+
+    Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}",
+        ts == null ? null : ts.tableName, mmCtx);
     // Configure export work
-    ExportWork exportWork =
-        new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName);
+    ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec,
+        ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx);
     // Create an export task and add it as a root task
-    return  TaskFactory.get(exportWork);
+    return TaskFactory.get(exportWork);
+  }
+  
+  @Override
+  public boolean hasTransactionalInQuery() {
+    return isMmExport; // Full ACID export goes thru UpdateDelete analyzer.
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e6a7012..e597872 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -382,7 +382,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
       ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
-      Long writeId, int stmtId, boolean isSourceMm) {
+      Long writeId, int stmtId) {
     assert table != null;
     assert table.getParameters() != null;
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
@@ -423,9 +423,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     if (replicationSpec.isInReplicationScope()) {
       copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
     } else {
-      CopyWork cw = new CopyWork(dataPath, destPath, false);
-      cw.setSkipSourceMmDirs(isSourceMm);
-      copyTask = TaskFactory.get(cw);
+      copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
     }
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
@@ -480,7 +478,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
     if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@ -517,9 +515,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         copyTask = ReplCopyTask.getLoadCopyTask(
             replicationSpec, new Path(srcLocation), destPath, x.getConf());
       } else {
-        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
-        cw.setSkipSourceMmDirs(isSourceMm);
-        copyTask = TaskFactory.get(cw);
+        copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
       }
 
       Task<?> addPartTask = TaskFactory.get(
@@ -830,8 +826,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
       throws HiveException, IOException, MetaException {
 
-    final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
-
     if (table != null) {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
@@ -841,7 +835,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -853,8 +847,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG());
-        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId,
-            isSourceMm);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId);
       }
       // Set this to read because we can't overwrite any existing partitions
       x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
@@ -873,7 +866,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (isPartitioned(tblDesc)) {
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-            replicationSpec, x, writeId, stmtId, isSourceMm));
+            replicationSpec, x, writeId, stmtId));
         }
       } else {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -891,7 +884,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
           checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG());
           t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x,
-              writeId, stmtId, isSourceMm));
+              writeId, stmtId));
         }
       }
       x.getTasks().add(t);
@@ -923,7 +916,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     Task<?> dropTblTask = null;
-    final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps());
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
 
     // Normally, on import, trying to create a table or a partition in a db that does not yet exist
@@ -1007,14 +999,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
             if (updatedMetadata != null) {
               updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
             }
           }
         } else {
           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
-          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));
         }
       }
 
@@ -1037,7 +1029,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
-                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
               if (updatedMetadata != null) {
                 updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
               }
@@ -1054,7 +1046,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             if (replicationSpec.allowReplacementInto(ptn.getParameters())){
               if (!replicationSpec.isMetadataOnly()){
                 x.getTasks().add(addSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm));
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
               } else {
                 x.getTasks().add(alterSinglePartition(
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
@@ -1080,7 +1072,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
           loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(),
-            replicationSpec, x, writeId, stmtId, isSourceMm);
+            replicationSpec, x, writeId, stmtId);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 8200463..70295da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -215,7 +215,7 @@ public final class SemanticAnalyzerFactory {
       case HiveParser.TOK_LOAD:
         return new LoadSemanticAnalyzer(queryState);
       case HiveParser.TOK_EXPORT:
-        if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
+        if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) {
           return new UpdateDeleteSemanticAnalyzer(queryState);
         }
         return new ExportSemanticAnalyzer(queryState);

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 70eb750..d73fc4f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,8 @@ import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
  * it has a blocking queue that stores partitions to be dumped via a producer thread.
  * it has a worker thread pool that reads of the queue to perform the various tasks.
  */
+// TODO: this object is created once to call one method and then immediately destroyed.
+//       So it's basically just a roundabout way to pass arguments to a static method. Simplify?
 class PartitionExport {
   private final Paths paths;
   private final PartitionIterable partitionIterable;
@@ -50,16 +53,18 @@ class PartitionExport {
   private final HiveConf hiveConf;
   private final int nThreads;
   private final SessionState callersSession;
+  private final MmContext mmCtx;
 
   private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class);
   private BlockingQueue<Partition> queue;
 
   PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser,
-      HiveConf hiveConf) {
+      HiveConf hiveConf, MmContext mmCtx) {
     this.paths = paths;
     this.partitionIterable = partitionIterable;
     this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
+    this.mmCtx = mmCtx;
     this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
     this.queue = new ArrayBlockingQueue<>(2 * nThreads);
     this.callersSession = SessionState.get();
@@ -106,7 +111,7 @@ class PartitionExport {
           List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
                   forReplicationSpec, hiveConf);
           Path rootDataDumpDir = paths.partitionExportDir(partitionName);
-          new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf)
+          new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
                   .export(forReplicationSpec);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index d0aeee5..2a3986a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
 
+// TODO: this object is created once to call one method and then immediately destroyed.
+//       So it's basically just a roundabout way to pass arguments to a static method. Simplify?
 public class TableExport {
   private static final Logger logger = LoggerFactory.getLogger(TableExport.class);
 
@@ -59,9 +62,10 @@ public class TableExport {
   private final String distCpDoAsUser;
   private final HiveConf conf;
   private final Paths paths;
+  private final MmContext mmCtx;
 
   public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db,
-      String distCpDoAsUser, HiveConf conf) {
+      String distCpDoAsUser, HiveConf conf, MmContext mmCtx) {
     this.tableSpec = (tableSpec != null
         && tableSpec.tableHandle.isTemporary()
         && replicationSpec.isInReplicationScope())
@@ -76,6 +80,7 @@ public class TableExport {
     this.distCpDoAsUser = distCpDoAsUser;
     this.conf = conf;
     this.paths = paths;
+    this.mmCtx = mmCtx;
   }
 
   public boolean write() throws SemanticException {
@@ -147,13 +152,13 @@ public class TableExport {
           throw new IllegalStateException("partitions cannot be null for partitionTable :"
               + tableSpec.tableName);
         }
-        new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec);
+        new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec);
       } else {
         List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
                 replicationSpec, conf);
 
         // this is the data copy
-        new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf)
+        new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
             .export(replicationSpec);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 690498f..b61a945 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.LoginException;
+
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 
+//TODO: this object is created once to call one method and then immediately destroyed.
+//So it's basically just a roundabout way to pass arguments to a static method. Simplify?
 public class FileOperations {
   private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
   private final List<Path> dataPathList;
@@ -45,13 +52,15 @@ public class FileOperations {
   private final String distCpDoAsUser;
   private HiveConf hiveConf;
   private final FileSystem dataFileSystem, exportFileSystem;
+  private final MmContext mmCtx;
 
-  public FileOperations(List<Path> dataPathList, Path exportRootDataDir,
-                        String distCpDoAsUser, HiveConf hiveConf) throws IOException {
+  public FileOperations(List<Path> dataPathList, Path exportRootDataDir, String distCpDoAsUser,
+      HiveConf hiveConf, MmContext mmCtx) throws IOException {
     this.dataPathList = dataPathList;
     this.exportRootDataDir = exportRootDataDir;
     this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
+    this.mmCtx = mmCtx;
     if ((dataPathList != null) && !dataPathList.isEmpty()) {
       dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
     } else {
@@ -72,17 +81,59 @@ public class FileOperations {
    * This writes the actual data in the exportRootDataDir from the source.
    */
   private void copyFiles() throws IOException, LoginException {
-    for (Path dataPath : dataPathList) {
-      FileStatus[] fileStatuses =
-              LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath);
-      List<Path> srcPaths = new ArrayList<>();
-      for (FileStatus fileStatus : fileStatuses) {
-        srcPaths.add(fileStatus.getPath());
+    if (mmCtx == null) {
+      for (Path dataPath : dataPathList) {
+        copyOneDataPath(dataPath, exportRootDataDir);
       }
-      new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
+    } else {
+      copyMmPath();
+    }
+  }
+
+  private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException {
+    FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath);
+    List<Path> srcPaths = new ArrayList<>();
+    for (FileStatus fileStatus : fileStatuses) {
+      srcPaths.add(fileStatus.getPath());
     }
+
+    new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths);
   }
 
+  private void copyMmPath() throws LoginException, IOException {
+    assert dataPathList.size() == 1;
+    ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName());
+    Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0));
+    List<Path> validPaths = getMmValidPaths(ids, fromPath);
+    String fromPathStr = fromPath.toString();
+    if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+       fromPathStr += Path.SEPARATOR;
+    }
+    for (Path validPath : validPaths) {
+      // Export valid directories with a modified name so they don't look like bases/deltas.
+      // We could also dump the delta contents all together and rename the files if names collide.
+      String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length());
+      Path destPath = new Path(exportRootDataDir, mmChildPath);
+      exportFileSystem.mkdirs(destPath);
+      copyOneDataPath(validPath, destPath);
+    }
+  }
+
+  private List<Path> getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException {
+    Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath);
+    AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids);
+    List<Path> validPaths = new ArrayList<>();
+    Path base = acidState.getBaseDirectory();
+    if (base != null) {
+      validPaths.add(base);
+    }
+    for (ParsedDelta pd : acidState.getCurrentDirectories()) {
+      validPaths.add(pd.getPath());
+    }
+    return validPaths;
+  }
+
+
   /**
    * This needs the root data directory to which the data needs to be exported to.
    * The data export here is a list of files either in table/partition that are written to the _files
@@ -90,8 +141,19 @@ public class FileOperations {
    */
   private void exportFilesAsList() throws SemanticException, IOException {
     try (BufferedWriter writer = writer()) {
-      for (Path dataPath : dataPathList) {
-        writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+      if (mmCtx != null) {
+        assert dataPathList.size() == 1;
+        Path dataPath = dataPathList.get(0);
+        ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(
+            hiveConf, mmCtx.getFqTableName());
+        List<Path> validPaths = getMmValidPaths(ids, dataPath);
+        for (Path mmPath : validPaths) {
+          writeFilesList(listFilesInDir(mmPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
+      } else {
+        for (Path dataPath : dataPathList) {
+          writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index c0e4a43..018983f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -33,15 +33,10 @@ public class CopyWork implements Serializable {
   private Path[] fromPath;
   private Path[] toPath;
   private boolean errorOnSrcEmpty;
-  private boolean isSkipMmDirs = false;
 
   public CopyWork() {
   }
 
-  public CopyWork(final Path fromPath, final Path toPath) {
-    this(fromPath, toPath, true);
-  }
-
   public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
     this(new Path[] { fromPath }, new Path[] { toPath });
     this.setErrorOnSrcEmpty(errorOnSrcEmpty);
@@ -92,17 +87,4 @@ public class CopyWork implements Serializable {
   public boolean isErrorOnSrcEmpty() {
     return errorOnSrcEmpty;
   }
-
-  /**
-   *  Whether the copy should ignore MM directories in the source, and copy their content to
-   * destination directly, rather than copying the directories themselves.
-   * */
-  public void setSkipSourceMmDirs(boolean isMm) {
-    this.isSkipMmDirs = isMm;
-  }
-
-  public boolean doSkipSourceMmDirs() {
-    return isSkipMmDirs ;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
index 72ce798..d91569e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java
@@ -17,39 +17,65 @@
  */
 package org.apache.hadoop.hive.ql.plan;
 
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-
 @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT,
     Explain.Level.EXTENDED })
 public class ExportWork implements Serializable {
-  private Logger LOG = LoggerFactory.getLogger(ExportWork.class);
+  private static Logger LOG = LoggerFactory.getLogger(ExportWork.class);
 
   private static final long serialVersionUID = 1L;
 
+  public final static class MmContext {
+    private final String fqTableName;
+
+    private MmContext(String fqTableName) {
+      this.fqTableName = fqTableName;
+    }
+
+    @Override
+    public String toString() {
+      return "[" + fqTableName + "]";
+    }
+
+    public static MmContext createIfNeeded(Table t) {
+      if (t == null) return null;
+      if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null;
+      return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName()));
+    }
+
+    public String getFqTableName() {
+      return fqTableName;
+    }
+  }
+
   private final String exportRootDirName;
   private TableSpec tableSpec;
   private ReplicationSpec replicationSpec;
   private String astRepresentationForErrorMsg;
-  private String qualifiedTableName;
+  private String acidFqTableName;
+  private final MmContext mmContext;
 
   /**
-   * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise
+   * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise
    */
   public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec,
-      String astRepresentationForErrorMsg, String qualifiedTable) {
+      String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) {
     this.exportRootDirName = exportRootDirName;
     this.tableSpec = tableSpec;
     this.replicationSpec = replicationSpec;
     this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
-    this.qualifiedTableName = qualifiedTable;
+    this.mmContext = mmContext;
+    this.acidFqTableName = acidFqTableName;
   }
 
   public String getExportRootDir() {
@@ -60,24 +86,16 @@ public class ExportWork implements Serializable {
     return tableSpec;
   }
 
-  public void setTableSpec(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
-  }
-
   public ReplicationSpec getReplicationSpec() {
     return replicationSpec;
   }
 
-  public void setReplicationSpec(ReplicationSpec replicationSpec) {
-    this.replicationSpec = replicationSpec;
-  }
-
   public String getAstRepresentationForErrorMsg() {
     return astRepresentationForErrorMsg;
   }
 
-  public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) {
-    this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+  public MmContext getMmContext() {
+    return mmContext;
   }
 
   /**
@@ -88,10 +106,10 @@ public class ExportWork implements Serializable {
    * for more info.
    */
   public void acidPostProcess(Hive db) throws HiveException {
-    if(qualifiedTableName != null) {
-      LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName +
+    if (acidFqTableName != null) {
+      LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName +
           " using partSpec=" + tableSpec.partSpec);
-      tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true);
+      tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 6faba42..3e2784b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -17,9 +17,24 @@
  */
 package org.apache.hadoop.hive.ql;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
@@ -47,13 +62,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The LockManager is not ready, but for no-concurrency straight-line path we can
  * test AC=true, and AC=false with commit/rollback/exception and test resulting data.
@@ -152,6 +160,106 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
   }
 
+  @Test
+  public void testMmExim() throws Exception {
+    String tableName = "mm_table", importName = tableName + "_import";
+    runStatementOnDriver("drop table if exists " + tableName);
+    runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
+        "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')",
+        tableName));
+
+    // Regular insert: export some MM deltas, then import into a new table.
+    int[][] rows1 = {{1,2},{3,4}};
+    runStatementOnDriver(String.format("insert into %s (a,b) %s",
+        tableName, makeValuesClause(rows1)));
+    runStatementOnDriver(String.format("insert into %s (a,b) %s",
+        tableName, makeValuesClause(rows1)));
+    IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+    org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(hiveConf);
+    Path exportPath = new Path(table.getSd().getLocation() + "_export");
+    fs.delete(exportPath, true);
+    runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
+    List<String> paths = listPathsRecursive(fs, exportPath);
+    verifyMmExportPaths(paths, 2);
+    runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
+    org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName);
+    Assert.assertEquals(imported.toString(), "insert_only",
+        imported.getParameters().get("transactional_properties"));
+    Path importPath = new Path(imported.getSd().getLocation());
+    FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+    Assert.assertEquals(Arrays.toString(stat), 1, stat.length);
+    assertIsDelta(stat[0]);
+    List<String> allData = stringifyValues(rows1);
+    allData.addAll(stringifyValues(rows1));
+    allData.sort(null);
+    Collections.sort(allData);
+    List<String> rs = runStatementOnDriver(
+        String.format("select a,b from %s order by a,b", importName));
+    Assert.assertEquals("After import: " + rs, allData, rs);
+    runStatementOnDriver("drop table if exists " + importName);
+    
+    // Do insert overwrite to create some invalid deltas, and import into a non-MM table.
+    int[][] rows2 = {{5,6},{7,8}};
+    runStatementOnDriver(String.format("insert overwrite table %s %s",
+        tableName, makeValuesClause(rows2)));
+    fs.delete(exportPath, true);
+    runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath));
+    paths = listPathsRecursive(fs, exportPath);
+    verifyMmExportPaths(paths, 1);
+    runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " +
+        "TBLPROPERTIES ('transactional'='false')", importName));
+    runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath));
+    imported = msClient.getTable("default", importName);
+    Assert.assertNull(imported.toString(), imported.getParameters().get("transactional"));
+    Assert.assertNull(imported.toString(),
+        imported.getParameters().get("transactional_properties"));
+    importPath = new Path(imported.getSd().getLocation());
+    stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter);
+    allData = stringifyValues(rows2);
+    Collections.sort(allData);
+    rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName));
+    Assert.assertEquals("After import: " + rs, allData, rs);
+    runStatementOnDriver("drop table if exists " + importName);
+    runStatementOnDriver("drop table if exists " + tableName);
+    msClient.close();
+  }
+
+  private void assertIsDelta(FileStatus stat) {
+    Assert.assertTrue(stat.toString(),
+        stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX));
+  }
+
+  private void verifyMmExportPaths(List<String> paths, int deltasOrBases) {
+    // 1 file, 1 dir for each, for now. Plus export "data" dir.
+    // This could be changed to a flat file list later.
+    Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size());
+    // No confusing directories in export.
+    for (String path : paths) {
+      Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX));
+      Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX));
+    }
+  }
+
+  private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException {
+    List<String> paths = new ArrayList<>();
+    LinkedList<Path> queue = new LinkedList<>();
+    queue.add(path);
+    while (!queue.isEmpty()) {
+      Path next = queue.pollFirst();
+      FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter);
+      for (FileStatus stat : stats) {
+        Path child = stat.getPath();
+        paths.add(child.toString());
+        if (stat.isDirectory()) {
+          queue.add(child);
+        }
+      }
+    }
+    return paths;
+  }
+
+  
   /**
    * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example)
    * @throws Exception

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 6daac1b..4f1d384 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -477,6 +477,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
   }
   private void testMM(boolean existingTable, boolean isSourceMM) throws Exception {
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+    hiveConf.setBoolean("mapred.input.dir.recursive", true);
 
     int[][] data = {{1,2}, {3, 4}, {5, 6}};
     runStatementOnDriver("drop table if exists T");
@@ -500,9 +501,10 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
     //verify that we are indeed doing an Acid write (import)
     rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME");
     Assert.assertEquals(3, rs.size());
-    Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0"));
-    Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0"));
-    Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0"));
+    for (String s : rs) {
+      Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/"));
+      Assert.assertTrue(s, s.endsWith("/000000_0"));
+    }
   }
   private void checkResult(String[][] expectedResult, String query, boolean isVectorized,
       String msg) throws Exception{
@@ -516,6 +518,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
   @Test
   public void testMMExportAborted() throws Exception {
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true);
+    hiveConf.setBoolean("mapred.input.dir.recursive", true);
     int[][] data = {{1, 2}, {3, 4}, {5, 6}};
     int[][] dataAbort = {{10, 2}};
     runStatementOnDriver("drop table if exists T");

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index a2adb96..a88a570 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -207,7 +207,8 @@ public abstract class TxnCommandsBaseForTests {
   void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) {
     LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
     logResult(LOG, rs);
-    Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size());
+    Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs,
+        expected.length, rs.size());
     //verify data and layout
     for(int i = 0; i < expected.length; i++) {
       Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0]));

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/queries/clientpositive/mm_exim.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_exim.q b/ql/src/test/queries/clientpositive/mm_exim.q
index c47342b..a2b6e08 100644
--- a/ql/src/test/queries/clientpositive/mm_exim.q
+++ b/ql/src/test/queries/clientpositive/mm_exim.q
@@ -59,13 +59,13 @@ drop table import1_mm;
 
 drop table import2_mm;
 import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart';
-desc import2_mm;
+desc formatted import2_mm;
 select * from import2_mm order by key, p;
 drop table import2_mm;
 
 drop table import3_mm;
 import table import3_mm from 'ql/test/data/exports/intermmediate_part';
-desc import3_mm;
+desc formatted import3_mm;
 select * from import3_mm order by key, p;
 drop table import3_mm;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7ebcdeb9/ql/src/test/results/clientpositive/llap/mm_exim.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_exim.q.out b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
index 1f40754..c683de3 100644
--- a/ql/src/test/results/clientpositive/llap/mm_exim.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_exim.q.out
@@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT
 #### A masked pattern was here ####
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@import2_mm
-PREHOOK: query: desc import2_mm
+PREHOOK: query: desc formatted import2_mm
 PREHOOK: type: DESCTABLE
 PREHOOK: Input: default@import2_mm
-POSTHOOK: query: desc import2_mm
+POSTHOOK: query: desc formatted import2_mm
 POSTHOOK: type: DESCTABLE
 POSTHOOK: Input: default@import2_mm
+# col_name            	data_type           	comment             
 key                 	int                 	                    
 p                   	int                 	                    
+	 	 
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	MANAGED_TABLE       	 
+Table Parameters:	 	 
+	bucketing_version   	2                   
+	numFiles            	3                   
+	numRows             	6                   
+	rawDataSize         	37                  
+	totalSize           	43                  
+	transactional       	true                
+	transactional_properties	insert_only         
+#### A masked pattern was here ####
+	 	 
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
+InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
 PREHOOK: query: select * from import2_mm order by key, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@import2_mm
@@ -338,18 +366,46 @@ POSTHOOK: Output: default@import3_mm
 POSTHOOK: Output: default@import3_mm@p=455
 POSTHOOK: Output: default@import3_mm@p=456
 POSTHOOK: Output: default@import3_mm@p=457
-PREHOOK: query: desc import3_mm
+PREHOOK: query: desc formatted import3_mm
 PREHOOK: type: DESCTABLE
 PREHOOK: Input: default@import3_mm
-POSTHOOK: query: desc import3_mm
+POSTHOOK: query: desc formatted import3_mm
 POSTHOOK: type: DESCTABLE
 POSTHOOK: Input: default@import3_mm
+# col_name            	data_type           	comment             
 key                 	int                 	                    
-p                   	int                 	                    
 	 	 
 # Partition Information	 	 
 # col_name            	data_type           	comment             
 p                   	int                 	                    
+	 	 
+# Detailed Table Information	 	 
+Database:           	default             	 
+#### A masked pattern was here ####
+Retention:          	0                   	 
+#### A masked pattern was here ####
+Table Type:         	MANAGED_TABLE       	 
+Table Parameters:	 	 
+	bucketing_version   	2                   
+	numFiles            	3                   
+	numPartitions       	3                   
+	numRows             	6                   
+	rawDataSize         	13                  
+	totalSize           	19                  
+	transactional       	true                
+	transactional_properties	insert_only         
+#### A masked pattern was here ####
+	 	 
+# Storage Information	 	 
+SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe	 
+InputFormat:        	org.apache.hadoop.mapred.TextInputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat	 
+Compressed:         	No                  	 
+Num Buckets:        	-1                  	 
+Bucket Columns:     	[]                  	 
+Sort Columns:       	[]                  	 
+Storage Desc Params:	 	 
+	serialization.format	1                   
 PREHOOK: query: select * from import3_mm order by key, p
 PREHOOK: type: QUERY
 PREHOOK: Input: default@import3_mm