You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2019/01/08 08:26:16 UTC

[1/3] hive git commit: HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)

Repository: hive
Updated Branches:
  refs/heads/master 0dbb896cf -> b3ef75eaa


http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index f05c231..dac20d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -29,12 +28,16 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
 
 public class TableSerializer implements JsonWriter.Serializer {
   public static final String FIELD_NAME = "table";
+  private static final Logger LOG = LoggerFactory.getLogger(TableSerializer.class);
+
   private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
   private final Iterable<Partition> partitions;
   private final HiveConf hiveConf;
@@ -53,8 +56,9 @@ public class TableSerializer implements JsonWriter.Serializer {
       return;
     }
 
-    Table tTable = tableHandle.getTTable();
-    tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider);
+    Table tTable = updatePropertiesInTable(
+        tableHandle.getTTable(), additionalPropertiesProvider
+    );
     try {
       TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
       writer.jsonGenerator
@@ -83,14 +87,6 @@ public class TableSerializer implements JsonWriter.Serializer {
                 ReplicationSpec.KEY.CURR_STATE_ID.toString(),
                 additionalPropertiesProvider.getCurrentReplicationState());
       }
-      if (isExternalTable(table)) {
-          // Replication destination will not be external - override if set
-        table.putToParameters("EXTERNAL", "FALSE");
-      }
-      if (isExternalTableType(table)) {
-          // Replication dest will not be external - override if set
-        table.setTableType(TableType.MANAGED_TABLE.toString());
-      }
     } else {
       // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
       // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\"");
@@ -101,17 +97,6 @@ public class TableSerializer implements JsonWriter.Serializer {
     return table;
   }
 
-  private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
-    return table.isSetTableType()
-        && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
-  }
-
-  private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) {
-    Map<String, String> params = table.getParameters();
-    return params.containsKey("EXTERNAL")
-        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
-  }
-
   private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider)
       throws SemanticException, IOException {
     writer.jsonGenerator.writeStartArray();

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
index 9907133..b04fdef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
@@ -36,7 +36,6 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -99,14 +98,7 @@ public class MetadataJson {
   }
 
   private ReplicationSpec readReplicationSpec() {
-    com.google.common.base.Function<String, String> keyFetcher =
-        new com.google.common.base.Function<String, String>() {
-          @Override
-          public String apply(@Nullable String s) {
-            return jsonEntry(s);
-          }
-        };
-    return new ReplicationSpec(keyFetcher);
+    return new ReplicationSpec(this::jsonEntry);
   }
 
   private void checkCompatibility() throws SemanticException, JSONException {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
index d412fd7..fe89ab2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -17,17 +17,41 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 public class InsertHandler extends AbstractMessageHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(InsertHandler.class);
+
   @Override
   public List<Task<? extends Serializable>> handle(Context withinContext)
       throws SemanticException {
+    try {
+      FileSystem fs =
+          FileSystem.get(new Path(withinContext.location).toUri(), withinContext.hiveConf);
+      MetaData metaData =
+          EximUtil.readMetaData(fs, new Path(withinContext.location, EximUtil.METADATA_NAME));
+      ReplicationSpec replicationSpec = metaData.getReplicationSpec();
+      if (replicationSpec.isNoop()) {
+        return Collections.emptyList();
+      }
+    } catch (Exception e) {
+      LOG.error("failed to load insert event", e);
+      throw new SemanticException(e);
+    }
+
     InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload());
     String actualDbName =
         withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index cdf51dd..4ae4894 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -89,6 +89,10 @@ public interface MessageHandler {
       return StringUtils.isEmpty(dbName);
     }
 
+    /**
+     * not sure why we have this, this should always be read from the _metadata file via the
+     * {@link org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson#readReplicationSpec}
+     */
     ReplicationSpec eventOnlyReplicationSpec() throws SemanticException {
       String eventId = dmd.getEventTo().toString();
       return new ReplicationSpec(eventId, eventId);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
index f5f4459..56c2abe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -17,36 +17,56 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_PARTITION;
-import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_TABLE;
-
 public class TableHandler extends AbstractMessageHandler {
+  private static final long DEFAULT_WRITE_ID = 0L;
+  private static final Logger LOG = LoggerFactory.getLogger(TableHandler.class);
+
   @Override
   public List<Task<? extends Serializable>> handle(Context context) throws SemanticException {
     try {
       List<Task<? extends Serializable>> importTasks = new ArrayList<>();
-      long writeId = 0;
+      boolean isExternal = false, isLocationSet = false;
+      String parsedLocation = null;
 
-      if (context.dmd.getDumpType().equals(EVENT_ALTER_TABLE)) {
-        AlterTableMessage message = deserializer.getAlterTableMessage(context.dmd.getPayload());
-        writeId = message.getWriteId();
-      } else if (context.dmd.getDumpType().equals(EVENT_ALTER_PARTITION)) {
-        AlterPartitionMessage message = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
-        writeId = message.getWriteId();
+      DumpType eventType = context.dmd.getDumpType();
+      Tuple tuple = extract(context);
+      if (tuple.isExternalTable) {
+        URI fromURI = EximUtil.getValidatedURI(context.hiveConf, context.location);
+        Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+        isLocationSet = true;
+        isExternal = true;
+        FileSystem fs = FileSystem.get(fromURI, context.hiveConf);
+        try {
+          MetaData rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+          Table table = new Table(rv.getTable());
+          parsedLocation = ReplExternalTables
+              .externalTableLocation(context.hiveConf, table.getSd().getLocation());
+        } catch (IOException e) {
+          throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+        }
       }
 
       context.nestedContext.setConf(context.hiveConf);
@@ -54,13 +74,13 @@ public class TableHandler extends AbstractMessageHandler {
           new EximUtil.SemanticAnalyzerWrapperContext(
               context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log,
               context.nestedContext);
-      x.setEventType(context.dmd.getDumpType());
+      x.setEventType(eventType);
 
       // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs.
       // Also, REPL LOAD doesn't support external table and hence no location set as well.
-      ImportSemanticAnalyzer.prepareImport(false, false, false, false,
-          (context.precursor != null), null, context.tableName, context.dbName,
-          null, context.location, x, updatedMetadata, context.getTxnMgr(), writeId);
+      ImportSemanticAnalyzer.prepareImport(false, isLocationSet, isExternal, false,
+          (context.precursor != null), parsedLocation, context.tableName, context.dbName,
+          null, context.location, x, updatedMetadata, context.getTxnMgr(), tuple.writeId);
 
       Task<? extends Serializable> openTxnTask = x.getOpenTxnTask();
       if (openTxnTask != null && !importTasks.isEmpty()) {
@@ -71,8 +91,57 @@ public class TableHandler extends AbstractMessageHandler {
       }
 
       return importTasks;
+    } catch (RuntimeException e){
+      throw e;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private Tuple extract(Context context) throws SemanticException {
+    try {
+      String tableType = null;
+      long writeId = DEFAULT_WRITE_ID;
+      switch (context.dmd.getDumpType()) {
+      case EVENT_CREATE_TABLE:
+      case EVENT_ADD_PARTITION:
+        Path metadataPath = new Path(context.location, EximUtil.METADATA_NAME);
+        MetaData rv = EximUtil.readMetaData(
+            metadataPath.getFileSystem(context.hiveConf),
+            metadataPath
+        );
+        tableType = rv.getTable().getTableType();
+        break;
+      case EVENT_ALTER_TABLE:
+        AlterTableMessage alterTableMessage =
+            deserializer.getAlterTableMessage(context.dmd.getPayload());
+        tableType = alterTableMessage.getTableObjAfter().getTableType();
+        writeId = alterTableMessage.getWriteId();
+        break;
+      case EVENT_ALTER_PARTITION:
+        AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+        tableType = msg.getTableObj().getTableType();
+        writeId = msg.getWriteId();
+        break;
+      default:
+        break;
+      }
+      boolean isExternalTable = tableType != null
+          && TableType.EXTERNAL_TABLE.equals(Enum.valueOf(TableType.class, tableType));
+      return new Tuple(isExternalTable, writeId);
     } catch (Exception e) {
+      LOG.error("failed to determine if the table associated with the event is external or not", e);
       throw new SemanticException(e);
     }
   }
+
+  private static final class Tuple {
+    private final boolean isExternalTable;
+    private final long writeId;
+
+    private Tuple(boolean isExternalTable, long writeId) {
+      this.isExternalTable = isExternalTable;
+      this.writeId = writeId;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 9e5c071..39f342f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,9 +46,11 @@ import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ Utils.class })
+@PrepareForTest({ Utils.class, ReplDumpTask.class})
 @PowerMockIgnore({ "javax.management.*" })
 public class TestReplDumpTask {
 
@@ -111,12 +115,17 @@ public class TestReplDumpTask {
     when(hive.getAllFunctions()).thenReturn(Collections.emptyList());
     when(queryState.getConf()).thenReturn(conf);
     when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L);
+    when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false);
+
+    whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class));
+    whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class));
 
     ReplDumpTask task = new StubReplDumpTask() {
       private int tableDumpCount = 0;
 
       @Override
-      void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb)
+      void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot,
+          long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple)
           throws Exception {
         tableDumpCount++;
         if (tableDumpCount > 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
index 5b75ca8..b9119a9 100644
--- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
+++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
@@ -68,7 +68,6 @@ show create table ext_t_imported;
 select * from ext_t_imported;
 
 -- should have repl.last.id
--- also - importing an external table replication export would turn the new table into a managed table
 import table ext_t_r_imported from 'ql/test/data/exports/ext_t_r';
 describe extended ext_t_imported;
 show table extended like ext_t_r_imported;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
index 40b6ad7..950b5e4 100644
--- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
+++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
@@ -411,7 +411,7 @@ PREHOOK: Input: default@ext_t_r_imported
 POSTHOOK: query: show create table ext_t_r_imported
 POSTHOOK: type: SHOW_CREATETABLE
 POSTHOOK: Input: default@ext_t_r_imported
-CREATE TABLE `ext_t_r_imported`(
+CREATE EXTERNAL TABLE `ext_t_r_imported`(
   `emp_id` int COMMENT 'employee id')
 PARTITIONED BY ( 
   `emp_country` string, 
@@ -425,7 +425,6 @@ OUTPUTFORMAT
 LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
-  'EXTERNAL'='FALSE', 
   'bucketing_version'='2', 
   'discover.partitions'='true', 
   'repl.last.id'='0', 

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/testutils/ptest2/conf/deployed/master-mr2.properties
----------------------------------------------------------------------
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 6f0056a..9166f4a 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,7 @@ ut.service.batchSize=8
 
 unitTests.module.itests.hive-unit=itests.hive-unit
 ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat
 
 unitTests.module.itests.qtest=itests.qtest
 ut.itests.qtest.batchSize=9


[3/3] hive git commit: HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)

Posted by an...@apache.org.
HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3ef75ea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3ef75ea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3ef75ea

Branch: refs/heads/master
Commit: b3ef75eaa1e828f8c80d95ea7c32abcd1f000ef4
Parents: 0dbb896
Author: Anishek Agarwal <an...@gmail.com>
Authored: Tue Jan 8 13:56:02 2019 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Tue Jan 8 13:56:02 2019 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   2 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../parse/BaseReplicationAcrossInstances.java   |  83 ++++
 .../TestReplTableMigrationWithJsonFormat.java   |  29 ++
 .../hive/ql/parse/TestReplicationScenarios.java |   4 +-
 ...TestReplicationScenariosAcrossInstances.java | 200 ++-------
 .../TestReplicationScenariosExternalTables.java | 420 +++++++++++++++++++
 ...ationScenariosIncrementalLoadAcidTables.java |   3 -
 .../TestReplicationScenariosMigration.java      |  33 --
 .../TestReplicationWithTableMigration.java      | 235 +++++++----
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  81 +---
 .../java/org/apache/hadoop/hive/ql/Context.java |   5 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   5 +
 .../exec/repl/ExternalTableCopyTaskBuilder.java | 150 +++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |  96 +++--
 .../hive/ql/exec/repl/ReplExternalTables.java   | 272 ++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java  |  53 ++-
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java  |  19 +-
 .../filesystem/BootstrapEventsIterator.java     |   1 +
 .../filesystem/DatabaseEventsIterator.java      |   6 +
 .../events/filesystem/FSTableEvent.java         |  44 +-
 .../bootstrap/load/table/LoadPartitions.java    |  91 ++--
 .../repl/bootstrap/load/table/LoadTable.java    |  73 +++-
 .../IncrementalLoadTasksBuilder.java            |  15 +-
 .../hive/ql/exec/repl/util/ReplUtils.java       |   3 +-
 .../apache/hadoop/hive/ql/metadata/Table.java   |   2 +-
 .../hive/ql/parse/BaseSemanticAnalyzer.java     |   3 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 116 +++--
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  34 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |   9 +
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    |   2 +-
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |   7 +-
 .../repl/dump/events/DropTableHandler.java      |   3 +-
 .../parse/repl/dump/events/InsertHandler.java   |   4 +
 .../parse/repl/dump/io/PartitionSerializer.java |   4 -
 .../ql/parse/repl/dump/io/TableSerializer.java  |  29 +-
 .../hive/ql/parse/repl/load/MetadataJson.java   |  10 +-
 .../parse/repl/load/message/InsertHandler.java  |  24 ++
 .../parse/repl/load/message/MessageHandler.java |   4 +
 .../parse/repl/load/message/TableHandler.java   | 103 ++++-
 .../hive/ql/exec/repl/TestReplDumpTask.java     |  13 +-
 .../queries/clientpositive/repl_2_exim_basic.q  |   1 -
 .../clientpositive/repl_2_exim_basic.q.out      |   3 +-
 .../ptest2/conf/deployed/master-mr2.properties  |   2 +-
 44 files changed, 1711 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 56748fd..23a3a6b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -638,6 +638,8 @@ public final class FileUtils {
   public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
       boolean deleteSource, String doAsUser,
       HiveConf conf, HadoopShims shims) throws IOException {
+    LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}",
+        StringUtils.join(",", srcPaths), dst.toString(), doAsUser);
     boolean copied = false;
     if (doAsUser == null){
       copied = shims.runDistCp(srcPaths, dst, conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6a7c4ab..b213609 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -502,6 +502,10 @@ public class HiveConf extends Configuration {
         + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n "
         + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n"
         + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
+    REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", "/",
+        "This is the base directory on the target/replica warehouse under which data for "
+            + "external tables is stored. This is relative base path and hence prefixed to the source "
+            + "external table path on target cluster."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
new file mode 100644
index 0000000..d321cca
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+public class BaseReplicationAcrossInstances {
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected static final Logger LOG = LoggerFactory.getLogger(BaseReplicationAcrossInstances.class);
+  static WarehouseInstance primary;
+  static WarehouseInstance replica;
+  String primaryDbName, replicatedDbName;
+  static HiveConf conf;
+
+  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+      throws Exception {
+    conf = new HiveConf(clazz);
+    conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+    MiniDFSCluster miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    Map<String, String> localOverrides = new HashMap<String, String>() {{
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+    }};
+    localOverrides.putAll(overrides);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    replica.close();
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+        SOURCE_OF_REPLICATION + "' = '1,2,3')");
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
new file mode 100644
index 0000000..0151ed0
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+
+public class TestReplTableMigrationWithJsonFormat extends TestReplicationWithTableMigration {
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    internalBeforeClassSetup(Collections.emptyMap());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 98cbd97..c85a2a4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -91,6 +91,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -393,7 +394,8 @@ public class TestReplicationScenarios {
     HiveConf confTemp = new HiveConf();
     confTemp.set("hive.repl.enable.move.optimization", "true");
     ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb,
-            null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId));
+            null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
+        Collections.emptyList());
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null);
     replLoadTask.executeTask(null);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index b50f9a8..0df99b3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -22,35 +22,29 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
-import org.apache.hadoop.hive.ql.util.DependencyResolver;
-import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.util.DependencyResolver;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
@@ -59,8 +53,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,63 +62,17 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-
-public class TestReplicationScenariosAcrossInstances {
-  @Rule
-  public final TestName testName = new TestName();
-
-  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
 
+public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     HashMap<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+        UserGroupInformation.getCurrentUser().getUserName());
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
-  }
-
-  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
-      throws Exception {
-    conf = new HiveConf(clazz);
-    conf.set("dfs.client.use.datanode.hostname", "true");
-    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
-    MiniDFSCluster miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    Map<String, String> localOverrides = new HashMap<String, String>() {{
-      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-      put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
-    }};
-    localOverrides.putAll(overrides);
-    primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
-  }
-
-  @AfterClass
-  public static void classLevelTearDown() throws IOException {
-    primary.close();
-    replica.close();
-  }
-
-  @Before
-  public void setup() throws Throwable {
-    primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
-    replicatedDbName = "replicated_" + primaryDbName;
-    primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
-            SOURCE_OF_REPLICATION + "' = '1,2,3')");
-  }
-
-  @After
-  public void tearDown() throws Throwable {
-    primary.run("drop database if exists " + primaryDbName + " cascade");
-    replica.run("drop database if exists " + replicatedDbName + " cascade");
+    internalBeforeClassSetup(overrides, TestReplicationScenariosAcrossInstances.class);
   }
 
   @Test
@@ -365,8 +313,10 @@ public class TestReplicationScenariosAcrossInstances {
         .dump(primaryDbName, null);
 
     // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
-    replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1);
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    List<String> withClause = Collections.singletonList(
+        "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
+
+    replica.load(replicatedDbName, tuple.dumpLocation, withClause)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "t1", "t2", "t3" })
@@ -433,7 +383,8 @@ public class TestReplicationScenariosAcrossInstances {
         .run("create table table2 (a int, city string) partitioned by (country string)")
         .run("create table table3 (i int, j int)")
         .run("insert into table1 values (1,2)")
-        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+        .dump(primaryDbName, null,
+            Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
         .run("use " + replicatedDbName)
@@ -1181,7 +1132,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("use " + importDbFromReplica)
             .run("import table t1 from " + exportPath)
             .run("select country from t1")
-            .verifyResults(Arrays.asList("india"));
+            .verifyResults(Collections.singletonList("india"));
 
     // Check if table/partition in C doesn't have ckpt property
     t1 = replica.getTable(importDbFromReplica, "t1");
@@ -1568,107 +1519,6 @@ public class TestReplicationScenariosAcrossInstances {
             .run(" drop database if exists " + replicatedDbName_CM + " cascade");
   }
 
-  @Test
-  public void testDumpExternalTableSetFalse() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary
-            .run("use " + primaryDbName)
-            .run("create external table t1 (id int)")
-            .run("insert into table t1 values (1)")
-            .run("insert into table t1 values (2)")
-            .run("create external table t2 (place string) partitioned by (country string)")
-            .run("insert into table t2 partition(country='india') values ('bangalore')")
-            .run("insert into table t2 partition(country='us') values ('austin')")
-            .run("insert into table t2 partition(country='france') values ('paris')")
-            .dump(primaryDbName, null);
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("repl status " + replicatedDbName)
-            .verifyResult(tuple.lastReplicationId)
-            .run("use " + replicatedDbName)
-            .run("show tables like 't1'")
-            .verifyFailure(new String[] {"t1"})
-            .run("show tables like 't2'")
-            .verifyFailure(new String[] {"t2"});
-
-    tuple = primary.run("use " + primaryDbName)
-            .run("create external table t3 (id int)")
-            .run("insert into table t3 values (10)")
-            .run("insert into table t3 values (20)")
-            .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
-                    + " with ('hive.repl.dump.metadata.only'='true')");
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("show tables like 't3'")
-            .verifyResult("t3")
-            .run("select id from t3 where id = 10")
-            .verifyFailure(new String[] {"10"});
-  }
-
-  @Test
-  public void testDumpExternalTableSetTrue() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary
-            .run("use " + primaryDbName)
-            .run("create external table t1 (id int)")
-            .run("insert into table t1 values (1)")
-            .run("insert into table t1 values (2)")
-            .run("create external table t2 (place string) partitioned by (country string)")
-            .run("insert into table t2 partition(country='india') values ('bangalore')")
-            .run("insert into table t2 partition(country='us') values ('austin')")
-            .run("insert into table t2 partition(country='france') values ('paris')")
-            .dump("repl dump " + primaryDbName + " with ('hive.repl.include.external.tables'='true')");
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("show tables like 't1'")
-            .verifyResult("t1")
-            .run("show tables like 't2'")
-            .verifyResult("t2")
-            .run("repl status " + replicatedDbName)
-            .verifyResult(tuple.lastReplicationId)
-            .run("select country from t2 where country = 'us'")
-            .verifyResult("us")
-            .run("select country from t2 where country = 'france'")
-            .verifyResult("france");
-
-    tuple = primary.run("use " + primaryDbName)
-            .run("create external table t3 (id int)")
-            .run("insert into table t3 values (10)")
-            .run("create external table t4 as select id from t3")
-            .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
-                    + " with ('hive.repl.include.external.tables'='true')");
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("show tables like 't3'")
-            .verifyResult("t3")
-            .run("select id from t3")
-            .verifyResult("10")
-            .run("select id from t4")
-            .verifyResult(null); // Returns null as create table event doesn't list files
-  }
-
-  @Test
-  public void testDumpExternalTableWithAddPartitionEvent() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
-
-    replica.load(replicatedDbName, tuple.dumpLocation);
-
-    tuple = primary.run("use " + primaryDbName)
-            .run("create external table t1 (place string) partitioned by (country string)")
-            .run("alter table t1 add partition(country='india')")
-            .run("alter table t1 add partition(country='us')")
-            .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
-                    + " with ('hive.repl.include.external.tables'='true')");
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("show tables like 't1'")
-            .verifyResult("t1")
-            .run("show partitions t1")
-            .verifyResults(new String[] { "country=india", "country=us" });
-  }
-
   // This requires the tables are loaded in a fixed sorted order.
   @Test
   public void testBootstrapLoadRetryAfterFailureForAlterTable() throws Throwable {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
new file mode 100644
index 0000000..0e3cefc
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances {
+
+  private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base";
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+        UserGroupInformation.getCurrentUser().getUserName());
+
+    internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+  }
+
+  @Test
+  public void replicationWithoutExternalTables() throws Throwable {
+    List<String> loadWithClause = externalTableBasePathWithClause();
+    List<String> dumpWithClause = Collections.singletonList
+        ("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'");
+
+    WarehouseInstance.Tuple tuple = primary
+        .run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('bangalore')")
+        .run("insert into table t2 partition(country='us') values ('austin')")
+        .run("insert into table t2 partition(country='france') values ('paris')")
+        .dump(primaryDbName, null, dumpWithClause);
+
+    // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+    assertFalse(primary.miniDFSCluster.getFileSystem()
+        .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyFailure(new String[] { "t1" })
+        .run("show tables like 't2'")
+        .verifyFailure(new String[] { "t2" });
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("create external table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("insert into table t3 values (20)")
+        .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause);
+
+    // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+    assertFalse(primary.miniDFSCluster.getFileSystem()
+        .exists(new Path(tuple.dumpLocation, FILE_NAME)));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't3'")
+        .verifyFailure(new String[] { "t3" });
+  }
+
+  @Test
+  public void externalTableReplicationWithDefaultPaths() throws Throwable {
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+        .run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2)")
+        .run("create external table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values ('bangalore')")
+        .run("insert into table t2 partition(country='us') values ('austin')")
+        .run("insert into table t2 partition(country='france') values ('paris')")
+        .dump("repl dump " + primaryDbName);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"),
+        new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
+
+    List<String> withClauseOptions = externalTableBasePathWithClause();
+
+    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("select country from t2 where country = 'us'")
+        .verifyResult("us")
+        .run("select country from t2 where country = 'france'")
+        .verifyResult("france");
+
+    assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("create external table t3 (id int)")
+        .run("insert into table t3 values (10)")
+        .run("create external table t4 as select id from t3")
+        .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"),
+        new Path(tuple.dumpLocation, FILE_NAME));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't3'")
+        .verifyResult("t3")
+        .run("select id from t3")
+        .verifyResult("10")
+        .run("select id from t4")
+        .verifyResult("10");
+
+    assertTablePartitionLocation(primaryDbName + ".t3", replicatedDbName + ".t3");
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("drop table t1")
+        .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"),
+        new Path(tuple.dumpLocation, FILE_NAME));
+  }
+
+  /**
+   * @param sourceTableName  -- Provide the fully qualified table name
+   * @param replicaTableName -- Provide the fully qualified table name
+   */
+  private void assertTablePartitionLocation(String sourceTableName, String replicaTableName)
+      throws HiveException {
+    Hive hiveForPrimary = Hive.get(primary.hiveConf);
+    Table sourceTable = hiveForPrimary.getTable(sourceTableName);
+    Path sourceLocation = sourceTable.getDataLocation();
+    Hive hiveForReplica = Hive.get(replica.hiveConf);
+    Table replicaTable = hiveForReplica.getTable(replicaTableName);
+    Path dataLocation = replicaTable.getDataLocation();
+    assertEquals(REPLICA_EXTERNAL_BASE + sourceLocation.toUri().getPath(),
+        dataLocation.toUri().getPath());
+    if (sourceTable.isPartitioned()) {
+      Set<Partition> sourcePartitions = hiveForPrimary.getAllPartitionsOf(sourceTable);
+      Set<Partition> replicaPartitions = hiveForReplica.getAllPartitionsOf(replicaTable);
+      assertEquals(sourcePartitions.size(), replicaPartitions.size());
+      List<String> expectedPaths =
+          sourcePartitions.stream()
+              .map(p -> REPLICA_EXTERNAL_BASE + p.getDataLocation().toUri().getPath())
+              .collect(Collectors.toList());
+      List<String> actualPaths =
+          replicaPartitions.stream()
+              .map(p -> p.getDataLocation().toUri().getPath())
+              .collect(Collectors.toList());
+      assertTrue(expectedPaths.containsAll(actualPaths));
+    }
+  }
+
+  @Test
+  public void externalTableReplicationWithCustomPaths() throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> loadWithClause = externalTableBasePathWithClause();
+
+    WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
+        .run("create external table a (i int, j int) "
+            + "row format delimited fields terminated by ',' "
+            + "location '" + externalTableLocation.toUri() + "'")
+        .dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 'a'")
+        .verifyResults(Collections.singletonList("a"))
+        .run("select * From a").verifyResults(Collections.emptyList());
+
+    assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+
+    //externally add data to location
+    try (FSDataOutputStream outputStream =
+        fs.create(new Path(externalTableLocation, "file1.txt"))) {
+      outputStream.write("1,2\n".getBytes());
+      outputStream.write("13,21\n".getBytes());
+    }
+
+    WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+        .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+        .run("select i From a")
+        .verifyResults(new String[] { "1", "13" })
+        .run("select j from a")
+        .verifyResults(new String[] { "2", "21" });
+
+    // alter table location to something new.
+    externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
+    incrementalTuple = primary.run("use " + primaryDbName)
+        .run("alter table a set location '" + externalTableLocation + "'")
+        .dump(primaryDbName, incrementalTuple.lastReplicationId);
+
+    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("select i From a")
+        .verifyResults(Collections.emptyList());
+    assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+  }
+
+  @Test
+  public void externalTableWithPartitions() throws Throwable {
+    Path externalTableLocation =
+        new Path("/" + testName.getMethodName() + "/t2/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> loadWithClause = externalTableBasePathWithClause();
+
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t2 (place string) partitioned by (country string) row format "
+            + "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+            + "'")
+        .run("insert into t2 partition(country='india') values ('bangalore')")
+        .dump("repl dump " + primaryDbName);
+
+    assertExternalFileInfo(Collections.singletonList("t2"),
+        new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't2'")
+        .verifyResults(new String[] { "t2" })
+        .run("select place from t2")
+        .verifyResults(new String[] { "bangalore" });
+
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+    // add new  data externally, to a partition, but under the table level top directory
+    Path partitionDir = new Path(externalTableLocation, "country=india");
+    try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
+      outputStream.write("pune\n".getBytes());
+      outputStream.write("mumbai\n".getBytes());
+    }
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("insert into t2 partition(country='australia') values ('sydney')")
+        .dump(primaryDbName, tuple.lastReplicationId);
+
+    assertExternalFileInfo(Collections.singletonList("t2"),
+        new Path(tuple.dumpLocation, FILE_NAME));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("select distinct(country) from t2")
+        .verifyResults(new String[] { "india", "australia" })
+        .run("select place from t2 where country='india'")
+        .verifyResults(new String[] { "bangalore", "pune", "mumbai" })
+        .run("select place from t2 where country='australia'")
+        .verifyResults(new String[] { "sydney" });
+
+    Path customPartitionLocation =
+        new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france");
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    // add new partitions to the table, at an external location than the table level directory
+    try (FSDataOutputStream outputStream = fs
+        .create(new Path(customPartitionLocation, "file.txt"))) {
+      outputStream.write("paris".getBytes());
+    }
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
+            .toString() + "'")
+        .dump(primaryDbName, tuple.lastReplicationId);
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("select place from t2 where country='france'")
+        .verifyResults(new String[] { "paris" });
+
+    // change the location of the partition via alter command
+    String tmpLocation = "/tmp/" + System.nanoTime();
+    primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
+        .dump(primaryDbName, tuple.lastReplicationId);
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("select place from t2 where country='france'")
+        .verifyResults(new String[] {});
+  }
+
+  @Test
+  public void externalTableIncrementalReplication() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
+    replica.load(replicatedDbName, tuple.dumpLocation);
+
+    tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (place string) partitioned by (country string)")
+        .run("alter table t1 add partition(country='india')")
+        .run("alter table t1 add partition(country='us')")
+        .dump(primaryDbName, tuple.lastReplicationId);
+
+    List<String> loadWithClause = externalTableBasePathWithClause();
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show partitions t1")
+        .verifyResults(new String[] { "country=india", "country=us" });
+
+    Hive hive = Hive.get(replica.getConf());
+    Set<Partition> partitions =
+        hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1"));
+    List<String> paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath())
+        .collect(Collectors.toList());
+
+    tuple = primary
+        .run("alter table t1 drop partition (country='india')")
+        .run("alter table t1 drop partition (country='us')")
+        .dump(primaryDbName, tuple.lastReplicationId);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+        .run("select * From t1")
+        .verifyResults(new String[] {});
+
+    for (String path : paths) {
+      assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path)));
+    }
+
+  }
+
+  private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
+    Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
+    DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
+    externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem);
+    fileSystem.mkdirs(externalTableLocation);
+
+    // this is required since the same filesystem is used in both source and target
+    return Collections.singletonList(
+        "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+            + externalTableLocation.toString() + "'"
+    );
+  }
+
+  private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile)
+      throws IOException {
+    DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
+    assertTrue(fileSystem.exists(externalTableInfoFile));
+    InputStream inputStream = fileSystem.open(externalTableInfoFile);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+    Set<String> tableNames = new HashSet<>();
+    for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+      String[] components = line.split(",");
+      assertEquals("The file should have tableName,base64encoded(data_location)",
+          2, components.length);
+      tableNames.add(components[0]);
+      assertTrue(components[1].length() > 0);
+    }
+    assertTrue(expected.containsAll(tableNames));
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 97775b3..5529d9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -22,10 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.ql.parse.WarehouseInstance;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils;
-
 import org.junit.rules.TestName;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
deleted file mode 100644
index 5b8e424..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import java.util.HashMap;
-import org.junit.BeforeClass;
-
-public class TestReplicationScenariosMigration extends org.apache.hadoop.hive.ql.parse.TestReplicationScenarios {
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    HashMap<String, String> overrideProperties = new HashMap<>();
-    overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
-        GzipJSONMessageEncoder.class.getCanonicalName());
-    internalBeforeClassSetup(overrideProperties, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
index ec64f4b..58561d4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
@@ -17,19 +17,24 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.ql.parse.WarehouseInstance;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +52,6 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.junit.rules.TestName;
-import com.google.common.collect.Lists;
 import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
 import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
 import static org.junit.Assert.assertEquals;
@@ -58,38 +62,48 @@ import static org.junit.Assert.assertTrue;
  * TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables)
  */
 public class TestReplicationWithTableMigration {
+  private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
+
   @Rule
   public final TestName testName = new TestName();
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class);
   private static WarehouseInstance primary, replica;
   private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
+  private Path avroSchemaFile = null;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    conf = new HiveConf(TestReplicationWithTableMigration.class);
+    HashMap<String, String> overrideProperties = new HashMap<>();
+    overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    internalBeforeClassSetup(overrideProperties);
+  }
+
+  static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception {
+    HiveConf conf = new HiveConf(TestReplicationWithTableMigration.class);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
-           new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
-        put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-        put("hive.support.concurrency", "true");
-        put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.metastore.client.capability.check", "false");
-        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
-        put("hive.exec.dynamic.partition.mode", "nonstrict");
-        put("hive.strict.checks.bucketing", "false");
-        put("hive.mapred.mode", "nonstrict");
-        put("mapred.input.dir.recursive", "true");
-        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
-        put("hive.strict.managed.tables", "true");
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    final DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+    HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{
+      put("fs.defaultFS", fs.getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.strict.managed.tables", "true");
     }};
-    replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
 
-    HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
-      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+    HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
+      put("fs.defaultFS", fs.getUri().toString());
       put("hive.metastore.client.capability.check", "false");
       put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
       put("hive.exec.dynamic.partition.mode", "nonstrict");
@@ -101,7 +115,40 @@ public class TestReplicationWithTableMigration {
       put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
       put("hive.strict.managed.tables", "false");
     }};
-    primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
+    configsForPrimary.putAll(overrideConfigs);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+  }
+
+  private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
+    Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
+    String[] schemaVals = new String[] { "{",
+        "  \"type\" : \"record\",",
+        "  \"name\" : \"table1\",",
+        "  \"doc\" : \"Sqoop import of table1\",",
+        "  \"fields\" : [ {",
+        "    \"name\" : \"col1\",",
+        "    \"type\" : [ \"null\", \"string\" ],",
+        "    \"default\" : null,",
+        "    \"columnName\" : \"col1\",",
+        "    \"sqlType\" : \"12\"",
+        "  }, {",
+        "    \"name\" : \"col2\",",
+        "    \"type\" : [ \"null\", \"long\" ],",
+        "    \"default\" : null,",
+        "    \"columnName\" : \"col2\",",
+        "    \"sqlType\" : \"13\"",
+        "  } ],",
+        "  \"tableName\" : \"table1\"",
+        "}"
+    };
+
+    try (FSDataOutputStream stream = fs.create(schemaFile)) {
+      for (String line : schemaVals) {
+        stream.write((line + "\n").getBytes());
+      }
+    }
+    fs.deleteOnExit(schemaFile);
+    return schemaFile;
   }
 
   @AfterClass
@@ -116,6 +163,12 @@ public class TestReplicationWithTableMigration {
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
             SOURCE_OF_REPLICATION + "' = '1,2,3')");
+    if (avroSchemaFile == null) {
+      Path testPath = new Path("/tmp/avro_schema/definition/" + System.nanoTime());
+      DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+      fs.mkdirs(testPath, new FsPermission("777"));
+      avroSchemaFile = PathBuilder.fullyQualifiedHDFSUri(createAvroSchemaFile(fs, testPath), fs);
+    }
   }
 
   @After
@@ -125,39 +178,52 @@ public class TestReplicationWithTableMigration {
   }
 
   private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable {
-    WarehouseInstance.Tuple tuple =  primary.run("use " + primaryDbName)
-            .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
-            .run("insert into tacid values(1)")
-            .run("insert into tacid values(2)")
-            .run("insert into tacid values(3)")
-            .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " +
-                    "into 3 buckets stored as orc ")
-            .run("alter table tacidpart add partition(country='france')")
-            .run("insert into tacidpart partition(country='india') values('mumbai')")
-            .run("insert into tacidpart partition(country='us') values('sf')")
-            .run("insert into tacidpart partition(country='france') values('paris')")
-            .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")")
-            .run("insert into tflat values(11)")
-            .run("insert into tflat values(22)")
-            .run("create table tflattext (id int) ")
-            .run("insert into tflattext values(111), (222)")
-            .run("create table tflattextpart (id int) partitioned by (country string) ")
-            .run("insert into tflattextpart partition(country='india') values(1111), (2222)")
-            .run("insert into tflattextpart partition(country='us') values(3333)")
-            .run("create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc  LOCATION '/tmp' ")
-            .run("insert into tacidloc values(1)")
-            .run("insert into tacidloc values(2)")
-            .run("insert into tacidloc values(3)")
-            .run("create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " +
-                    "into 3 buckets stored as orc ")
-            .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'")
-            .run("insert into tacidpartloc partition(country='india') values('mumbai')")
-            .run("insert into tacidpartloc partition(country='us') values('sf')")
-            .run("insert into tacidpartloc partition(country='france') values('paris')")
-            .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " +
-                    "stored as avro tblproperties ('avro.schema.url'='" + primary.avroSchemaFile.toUri().toString() + "')")
-            .run("insert into avro_table values('str1', 10)")
-            .dump(primaryDbName, fromReplId);
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
+        .run("insert into tacid values(1)")
+        .run("insert into tacid values(2)")
+        .run("insert into tacid values(3)")
+        .run(
+            "create table tacidpart (place string) partitioned by (country string) clustered by(place) "
+                +
+                "into 3 buckets stored as orc ")
+        .run("alter table tacidpart add partition(country='france')")
+        .run("insert into tacidpart partition(country='india') values('mumbai')")
+        .run("insert into tacidpart partition(country='us') values('sf')")
+        .run("insert into tacidpart partition(country='france') values('paris')")
+        .run(
+            "create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")")
+        .run("insert into tflat values(11)")
+        .run("insert into tflat values(22)")
+        .run("create table tflattext (id int) ")
+        .run("insert into tflattext values(111), (222)")
+        .run("create table tflattextpart (id int) partitioned by (country string) ")
+        .run("insert into tflattextpart partition(country='india') values(1111), (2222)")
+        .run("insert into tflattextpart partition(country='us') values(3333)")
+        .run(
+            "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc  LOCATION '/tmp' ")
+        .run("insert into tacidloc values(1)")
+        .run("insert into tacidloc values(2)")
+        .run("insert into tacidloc values(3)")
+        .run(
+            "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) "
+                +
+                "into 3 buckets stored as orc ")
+        .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'")
+        .run("insert into tacidpartloc partition(country='india') values('mumbai')")
+        .run("insert into tacidpartloc partition(country='us') values('sf')")
+        .run("insert into tacidpartloc partition(country='france') values('paris')")
+        .run(
+            "create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
+                + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
+                .toString() + "')")
+        .run("insert into avro_table values ('str1', 10)")
+        .run(
+            "create table avro_table_part partitioned by (country string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
+                + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
+                .toString() + "')")
+        .run("insert into avro_table_part partition (country='india') values ('another', 13)")
+        .dump(primaryDbName, fromReplId);
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid")));
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart")));
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat")));
@@ -165,17 +231,24 @@ public class TestReplicationWithTableMigration {
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart")));
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc")));
     assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc")));
-    Table avroTable = primary.getTable(primaryDbName, "avro_table");
-    assertFalse(isTransactionalTable(avroTable));
-    assertFalse(MetaStoreUtils.isExternalTable(avroTable));
+    assertAvroTableState(primaryDbName, "avro_table", "avro_table_part");
+    assertAvroTableState(primaryDbName,  "avro_table_part");
     return tuple;
   }
 
+  private void assertAvroTableState(String primaryDbName, String... tableNames) throws Exception {
+    for (String tableName : tableNames) {
+      Table avroTable = primary.getTable(primaryDbName, tableName);
+      assertFalse(isTransactionalTable(avroTable));
+      assertFalse(MetaStoreUtils.isExternalTable(avroTable));
+    }
+  }
+
   private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
     replica.run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart",
-                    "tacidloc", "tacidpartloc", "avro_table"})
+                "tacidloc", "tacidpartloc", "avro_table", "avro_table_part" })
             .run("repl status " + replicatedDbName)
             .verifyResult(lastReplId)
             .run("select id from tacid order by id")
@@ -193,7 +266,9 @@ public class TestReplicationWithTableMigration {
             .run("select country from tacidpartloc order by country")
             .verifyResults(new String[] {"france", "india", "us"})
             .run("select col1 from avro_table")
-            .verifyResults(new String[] {"str1"});
+            .verifyResults(new String[] { "str1" })
+            .run("select col1 from avro_table_part")
+            .verifyResults(new String[] { "another" });
 
     assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
     assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart")));
@@ -204,23 +279,29 @@ public class TestReplicationWithTableMigration {
     assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart")));
     assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc")));
     assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc")));
+    assertTablePath(replicatedDbName, "avro_table");
+    assertPartitionPath(replicatedDbName, "avro_table_part");
+  }
 
-    /*Path databasePath = new Path(replica.warehouseRoot, replica.getDatabase(replicatedDbName).getLocationUri());
-    assertEquals(replica.getTable(replicatedDbName, "tacidloc").getSd().getLocation(),
-            new Path(databasePath,"tacidloc").toUri().toString());
-
-    Path tablePath = new Path(databasePath, "tacidpartloc");
-    List<Partition> partitions = replica.getAllPartitions(replicatedDbName, "tacidpartloc");
-    for (Partition part : partitions) {
-      tablePath.equals(new Path(part.getSd().getLocation()).getParent());
-    }*/
+  private void assertPartitionPath(String replicatedDbName, String tableName) throws Exception {
+    Path tablePath = assertTablePath(replicatedDbName, tableName);
+    List<Partition> partitions = replica.getAllPartitions(replicatedDbName, tableName);
+    assertEquals(1, partitions.size());
+    String actualPartitionPath = partitions.iterator().next().getSd().getLocation().toLowerCase();
+    String expectedPartitionPath = new PathBuilder(tablePath.toString())
+        .addDescendant("country=india").build().toUri().toString().toLowerCase();
+    assertEquals(expectedPartitionPath, actualPartitionPath);
+  }
 
-    Table avroTable = replica.getTable(replicatedDbName, "avro_table");
+  private Path assertTablePath(String replicatedDbName, String tableName) throws Exception {
+    Table avroTable = replica.getTable(replicatedDbName, tableName);
     assertTrue(MetaStoreUtils.isExternalTable(avroTable));
-    Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db")
-            .addDescendant("avro_table")
-            .build();
-    assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase());
+    Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString())
+        .addDescendant(replicatedDbName + ".db").addDescendant(tableName).build();
+    String expectedTablePath = tablePath.toUri().toString().toLowerCase();
+    String actualTablePath = avroTable.getSd().getLocation().toLowerCase();
+    assertEquals(expectedTablePath, actualTablePath);
+    return tablePath;
   }
 
   private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
@@ -231,12 +312,12 @@ public class TestReplicationWithTableMigration {
       public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) {
         injectionPathCalled = true;
         if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
-          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
-                  + " Constraint Table: " + String.valueOf(args.constraintTblName));
+          LOG.warn("Verifier - DB: " + args.dbName
+                  + " Constraint Table: " + args.constraintTblName);
           return false;
         }
         if (args.tblName != null) {
-          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+          LOG.warn("Verifier - Table: " + args.tblName);
           return args.tblName.equalsIgnoreCase(tbl);
         }
         return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 92f2456..bf4154c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -55,11 +55,8 @@ import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
 import org.codehaus.plexus.util.ExceptionUtils;
 import org.slf4j.Logger;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 
 import java.io.Closeable;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -75,20 +72,18 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class WarehouseInstance implements Closeable {
-  final String functionsRoot;
+  final String functionsRoot, repldDir;
   private Logger logger;
   private IDriver driver;
   HiveConf hiveConf;
   MiniDFSCluster miniDFSCluster;
   private HiveMetaStoreClient client;
-  public final Path warehouseRoot;
-  public final Path externalTableWarehouseRoot;
-  public Path avroSchemaFile;
+  final Path warehouseRoot;
+  final Path externalTableWarehouseRoot;
 
   private static int uniqueIdentifier = 0;
 
   private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
-  private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
 
   WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf,
       String keyNameForEncryptedZone) throws Exception {
@@ -106,8 +101,14 @@ public class WarehouseInstance implements Closeable {
     }
     Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
     this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
-    initialize(cmRootPath.toString(), warehouseRoot.toString(), externalTableWarehouseRoot.toString(),
-            overridesForHiveConf);
+    String tmpDir = "/tmp/"
+        + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
+        + "_"
+        + System.nanoTime();
+
+    this.repldDir = mkDir(fs, tmpDir + "/hrepl" + uniqueIdentifier + "/").toString();
+    initialize(cmRootPath.toString(), externalTableWarehouseRoot.toString(),
+        warehouseRoot.toString(), overridesForHiveConf);
   }
 
   WarehouseInstance(Logger logger, MiniDFSCluster cluster,
@@ -115,18 +116,13 @@ public class WarehouseInstance implements Closeable {
     this(logger, cluster, overridesForHiveConf, null);
   }
 
-  private void initialize(String cmRoot, String warehouseRoot, String externalTableWarehouseRoot,
+  private void initialize(String cmRoot, String externalTableWarehouseRoot, String warehouseRoot,
       Map<String, String> overridesForHiveConf) throws Exception {
     hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
     for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) {
       hiveConf.set(entry.getKey(), entry.getValue());
     }
     String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
-    String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
-        + Path.SEPARATOR
-        + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
-        + "_"
-        + System.nanoTime();
     if (metaStoreUri != null) {
       hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
       return;
@@ -143,8 +139,7 @@ public class WarehouseInstance implements Closeable {
     hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot);
     hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
         "jdbc:derby:memory:${test.tmp.dir}/APP;create=true");
-    hiveConf.setVar(HiveConf.ConfVars.REPLDIR,
-        hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/");
+    hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir);
     hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -158,11 +153,6 @@ public class WarehouseInstance implements Closeable {
 
     MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
 
-    Path testPath = new Path(hiveWarehouseLocation);
-    FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf);
-    testPathFileSystem.mkdirs(testPath);
-
-    avroSchemaFile = createAvroSchemaFile(testPathFileSystem, testPath);
     driver = DriverFactory.newDriver(hiveConf);
     SessionState.start(new CliSessionState(hiveConf));
     client = new HiveMetaStoreClient(hiveConf);
@@ -177,53 +167,10 @@ public class WarehouseInstance implements Closeable {
   private Path mkDir(DistributedFileSystem fs, String pathString)
       throws IOException, SemanticException {
     Path path = new Path(pathString);
-    fs.mkdir(path, new FsPermission("777"));
+    fs.mkdirs(path, new FsPermission("777"));
     return PathBuilder.fullyQualifiedHDFSUri(path, fs);
   }
 
-  private Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
-    Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
-    String[] schemaVals = new String[] { "{",
-            "  \"type\" : \"record\",",
-            "  \"name\" : \"table1\",",
-            "  \"doc\" : \"Sqoop import of table1\",",
-            "  \"fields\" : [ {",
-            "    \"name\" : \"col1\",",
-            "    \"type\" : [ \"null\", \"string\" ],",
-            "    \"default\" : null,",
-            "    \"columnName\" : \"col1\",",
-            "    \"sqlType\" : \"12\"",
-            "  }, {",
-            "    \"name\" : \"col2\",",
-            "    \"type\" : [ \"null\", \"long\" ],",
-            "    \"default\" : null,",
-            "    \"columnName\" : \"col2\",",
-            "    \"sqlType\" : \"13\"",
-            "  } ],",
-            "  \"tableName\" : \"table1\"",
-            "}"
-    };
-    createTestDataFile(schemaFile.toUri().getPath(), schemaVals);
-    return schemaFile;
-  }
-
-  private void createTestDataFile(String filename, String[] lines) throws IOException {
-    FileWriter writer = null;
-    try {
-      File file = new File(filename);
-      file.deleteOnExit();
-      writer = new FileWriter(file);
-      int i=0;
-      for (String line : lines) {
-        writer.write(line + "\n");
-      }
-    } finally {
-      if (writer != null) {
-        writer.close();
-      }
-    }
-  }
-
   public HiveConf getConf() {
     return hiveConf;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index aabc34d..18089d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -702,14 +702,13 @@ public class Context {
    */
   public Path getExternalTmpPath(Path path) {
     URI extURI = path.toUri();
-    if (extURI.getScheme().equals("viewfs")) {
+    if ("viewfs".equals(extURI.getScheme())) {
       // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/..
       // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir
       // on same namespace as tbl dir.
       return getExtTmpPathRelTo(path.getParent());
     }
-    return new Path(getExternalScratchDir(extURI), EXT_PREFIX +
-        nextPathId());
+    return new Path(getExternalScratchDir(extURI), EXT_PREFIX + nextPathId());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 47a802f..40cc576 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -56,6 +56,10 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask;
+
+
 /**
  * TaskFactory implementation.
  **/
@@ -113,6 +117,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class));
     taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
     taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
+    taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
new file mode 100644
index 0000000..efecdb8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class ExternalTableCopyTaskBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class);
+  private final ReplLoadWork work;
+  private final HiveConf conf;
+
+  ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) {
+    this.work = work;
+    this.conf = conf;
+  }
+
+  List<Task<? extends Serializable>> tasks(TaskTracker tracker) {
+    List<Task<? extends Serializable>> tasks = new ArrayList<>();
+    Iterator<DirCopyWork> itr = work.getPathsToCopyIterator();
+    while (tracker.canAddMoreTasks() && itr.hasNext()) {
+      DirCopyWork dirCopyWork = itr.next();
+      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+      tasks.add(task);
+      tracker.addTask(task);
+      LOG.debug("added task for {}", dirCopyWork);
+    }
+    return tasks;
+  }
+
+  public static class DirCopyTask extends Task<DirCopyWork> implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
+    private static final int MAX_COPY_RETRY = 5;
+
+    @Override
+    protected int execute(DriverContext driverContext) {
+      String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+
+      Path sourcePath = work.fullyQualifiedSourcePath;
+      Path targetPath = work.fullyQualifiedTargetPath;
+      if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+        sourcePath = reservedRawPath(work.fullyQualifiedSourcePath.toUri());
+        targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri());
+      }
+      int currentRetry = 0;
+      while (currentRetry < MAX_COPY_RETRY) {
+        try {
+          UserGroupInformation ugi = Utils.getUGI();
+          String currentUser = ugi.getShortUserName();
+          boolean usePrivilegedUser =
+              distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
+
+          // do we create a new conf and only here provide this additional option so that we get away from
+          // differences of data in two location for the same directories ?
+          // basically add distcp.options.delete to hiveconf new object ?
+          FileUtils.distCp(
+              sourcePath.getFileSystem(conf), // source file system
+              Collections.singletonList(sourcePath),  // list of source paths
+              targetPath,
+              false,
+              usePrivilegedUser ? distCpDoAsUser : null,
+              conf,
+              ShimLoader.getHadoopShims());
+          return 0;
+        } catch (Exception e) {
+          if (++currentRetry < MAX_COPY_RETRY) {
+            LOG.warn("unable to copy", e);
+          } else {
+            LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+            setException(e);
+            return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+          }
+        }
+      }
+      LOG.error("should never come here ");
+      return -1;
+    }
+
+    private static Path reservedRawPath(URI uri) {
+      return new Path(uri.getScheme(), uri.getAuthority(),
+          CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
+    }
+
+    @Override
+    public StageType getType() {
+      return StageType.REPL_INCREMENTAL_LOAD;
+    }
+
+    @Override
+    public String getName() {
+      return "DIR_COPY_TASK";
+    }
+  }
+
+  @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
+      Explain.Level.DEFAULT,
+      Explain.Level.EXTENDED })
+  public static class DirCopyWork implements Serializable {
+    private final Path fullyQualifiedSourcePath, fullyQualifiedTargetPath;
+
+    public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) {
+      this.fullyQualifiedSourcePath = fullyQualifiedSourcePath;
+      this.fullyQualifiedTargetPath = fullyQualifiedTargetPath;
+    }
+
+    @Override
+    public String toString() {
+      return "DirCopyWork{" +
+          "fullyQualifiedSourcePath=" + fullyQualifiedSourcePath +
+          ", fullyQualifiedTargetPath=" + fullyQualifiedTargetPath +
+          '}';
+    }
+  }
+}


[2/3] hive git commit: HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)

Posted by an...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 5d6ae7f..497e103 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -75,12 +76,15 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
   private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
   private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
   private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
+  private static final long SLEEP_TIME = 60000;
+
   public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
     private final String name;
     private final String prefix;
@@ -97,7 +101,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
-  private static long sleepTime = 60000;
   private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
   private ReplLogger replLogger;
 
@@ -119,11 +122,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       } else {
         lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb);
       }
-      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
-    } catch (RuntimeException e) {
-      LOG.error("failed", e);
-      setException(e);
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+      prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)));
     } catch (Exception e) {
       LOG.error("failed", e);
       setException(e);
@@ -132,8 +131,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return 0;
   }
 
-  private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
-    LOG.debug("prepareReturnValues : " + schema);
+  private void prepareReturnValues(List<String> values) throws SemanticException {
+    LOG.debug("prepareReturnValues : " + dumpSchema);
     for (String s : values) {
       LOG.debug("    > " + s);
     }
@@ -195,6 +194,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         dmd.getDumpFilePath(), conf);
     dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
     dmd.write();
+
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
+        !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+      try (Writer writer = new Writer(dumpRoot, conf)) {
+        for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
+          Table table = hiveDb.getTable(dbName, tableName);
+          if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+            writer.dataLocationDump(table);
+          }
+        }
+      }
+    }
     return lastReplId;
   }
 
@@ -241,11 +252,27 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
       String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
       Exception caught = null;
-      try {
+      try (Writer writer = new Writer(dbRoot, conf)) {
         for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
           LOG.debug(
               "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
-          dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb);
+          try {
+            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName);
+            boolean shouldWriteExternalTableLocationInfo =
+                conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+                && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())
+                && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+            if (shouldWriteExternalTableLocationInfo) {
+              LOG.debug("adding table {} to external tables list");
+              writer.dataLocationDump(tableTuple.object);
+            }
+            dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
+                tableTuple);
+          } catch (InvalidTableException te) {
+            // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
+            // Just log a debug message and skip it.
+            LOG.debug(te.getMessage());
+          }
           dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
         }
       } catch (Exception e) {
@@ -293,34 +320,27 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return dbRoot;
   }
 
-  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception {
-    try {
-      HiveWrapper.Tuple<Table> tuple = new HiveWrapper(hiveDb, dbName).table(tblName);
-      TableSpec tableSpec = new TableSpec(tuple.object);
-      TableExport.Paths exportPaths =
-          new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
-      String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-      tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
-      if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
-        tuple.replicationSpec.setValidTxnList(validTxnList);
-        tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
-
-        // For transactional table, data would be valid snapshot for current txn and doesn't include data
-        // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
-        // as bootstrap dump's last repl Id.
-        tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
-      }
-      MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
-      new TableExport(
-          exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write();
-
-
-      replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
-    } catch (InvalidTableException te) {
-      // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
-      // Just log a debug message and skip it.
-      LOG.debug(te.getMessage());
+  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId,
+      Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
+    TableSpec tableSpec = new TableSpec(tuple.object);
+    TableExport.Paths exportPaths =
+        new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
+    String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+    tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
+    if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
+      tuple.replicationSpec.setValidTxnList(validTxnList);
+      tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+
+      // For transactional table, data would be valid snapshot for current txn and doesn't include data
+      // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
+      // as bootstrap dump's last repl Id.
+      tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
     }
+    MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
+    new TableExport(
+        exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write();
+
+    replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
   }
 
   private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
@@ -365,7 +385,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
       // Wait for 1 minute and check again.
       try {
-        Thread.sleep(sleepTime);
+        Thread.sleep(SLEEP_TIME);
       } catch (InterruptedException e) {
         LOG.info("REPL DUMP thread sleep interrupted", e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
new file mode 100644
index 0000000..012df9d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Format of the file used to dump information about external tables:
+ * <p>
+ * table_name1,[base64Encoded(table_dir_location)]\n
+ *
+ * The file generated here is explicitly used for data copy of external tables and hence handling of
+ * writing this file is different than regular event handling for replication based on the conditions
+ * specified in {@link org.apache.hadoop.hive.ql.parse.repl.dump.Utils#shouldReplicate}
+ */
+public final class ReplExternalTables {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class);
+  private static final String FIELD_SEPARATOR = ",";
+  public static final String FILE_NAME = "_external_tables_info";
+  private static final int MAX_RETRIES = 5;
+
+  private ReplExternalTables(){}
+
+  public static String externalTableLocation(HiveConf hiveConf, String location) {
+    String currentPath = new Path(location).toUri().getPath();
+    String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
+    URI basePath = new Path(baseDir).toUri();
+    String dataPath = currentPath.replaceFirst(Path.SEPARATOR, basePath.getPath() + Path.SEPARATOR);
+    Path dataLocation = new Path(basePath.getScheme(), basePath.getAuthority(), dataPath);
+    LOG.debug("incoming location: {} , new location: {}", location, dataLocation.toString());
+    return dataLocation.toString();
+  }
+
+  public static class Writer implements Closeable {
+    private static Logger LOG = LoggerFactory.getLogger(Writer.class);
+    private final HiveConf hiveConf;
+    private final Path writePath;
+    private final Boolean excludeExternalTables, dumpMetadataOnly;
+    private OutputStream writer;
+
+    Writer(Path dbRoot, HiveConf hiveConf) throws IOException {
+      this.hiveConf = hiveConf;
+      writePath = new Path(dbRoot, FILE_NAME);
+      excludeExternalTables = !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
+      dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+      if (shouldWrite()) {
+        this.writer = FileSystem.get(hiveConf).create(writePath);
+      }
+    }
+
+    private boolean shouldWrite() {
+      return !dumpMetadataOnly && !excludeExternalTables;
+    }
+
+    /**
+     * this will dump a single line per external table. it can include additional lines for the same
+     * table if the table is partitioned and the partition location is outside the table.
+     */
+    void dataLocationDump(Table table)
+        throws InterruptedException, IOException, HiveException {
+      if (!shouldWrite()) {
+        return;
+      }
+      if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+        throw new IllegalArgumentException(
+            "only External tables can be writen via this writer, provided table is " + table
+                .getTableType());
+      }
+      Path fullyQualifiedDataLocation =
+          PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf));
+      write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+      if (table.isPartitioned()) {
+        List<Partition> partitions = Hive.get(hiveConf).getPartitions(table);
+        for (Partition partition : partitions) {
+          boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree(
+              partition.getDataLocation(), table.getDataLocation()
+          );
+          if (partitionLocOutsideTableLoc) {
+            fullyQualifiedDataLocation = PathBuilder
+                .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf));
+            write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+          }
+        }
+      }
+    }
+
+    private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf)
+        throws IOException, SemanticException {
+      StringWriter lineToWrite = new StringWriter();
+      lineToWrite.append(tableName).append(FIELD_SEPARATOR);
+      Path dataLocation =
+          PathBuilder.fullyQualifiedHDFSUri(dataLoc, dataLoc.getFileSystem(hiveConf));
+      byte[] encodedBytes = Base64.getEncoder()
+          .encode(dataLocation.toString().getBytes(StandardCharsets.UTF_8));
+      String encodedPath = new String(encodedBytes, StandardCharsets.UTF_8);
+      lineToWrite.append(encodedPath).append("\n");
+      return lineToWrite.toString();
+    }
+
+    private void write(String line) throws InterruptedException {
+      int currentRetry = 0;
+      while (currentRetry < MAX_RETRIES) {
+        try {
+          writer.write(line.getBytes(StandardCharsets.UTF_8));
+          break;
+        } catch (IOException e) {
+          currentRetry++;
+          if (currentRetry < MAX_RETRIES) {
+            LOG.warn("failed to write data with maxRetries {} due to", currentRetry, e);
+          } else {
+            LOG.error("failed to write data with maxRetries {} due to", currentRetry, e);
+            throw new RuntimeException("failed to write data", e);
+          }
+          Thread.sleep(100 * currentRetry * currentRetry);
+          writer = openWriterAppendMode();
+        }
+      }
+    }
+
+    private OutputStream openWriterAppendMode() {
+      try {
+        // not sure if the exception was due to a incorrect state within the writer hence closing it
+        close();
+        return FileSystem.get(hiveConf).append(writePath);
+      } catch (IOException e1) {
+        String message = "there was an error to open the file {} in append mode";
+        LOG.error(message, writePath.toString(), e1);
+        throw new IllegalStateException(message, e1);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+  }
+
+  public static class Reader {
+    private static Logger LOG = LoggerFactory.getLogger(Reader.class);
+    private final HiveConf hiveConf;
+    private final Path rootReplLoadPath;
+    private final boolean isIncrementalPhase;
+
+    public Reader(HiveConf conf, Path rootReplLoadPath, boolean isIncrementalPhase) {
+      this.hiveConf = conf;
+      this.rootReplLoadPath = rootReplLoadPath;
+      this.isIncrementalPhase = isIncrementalPhase;
+    }
+
+    /**
+     * currently we only support dump/load of single db and the db Dump location cannot be inferred from
+     * the incoming dbNameOfPattern value since the load db name can be different from the target db Name
+     * hence traverse 1 level down from rootReplLoadPath to look for the file providing the hdfs locations.
+     */
+    public Set<String> sourceLocationsToCopy() throws IOException {
+      if (isIncrementalPhase) {
+        return sourceLocationsToCopy(new Path(rootReplLoadPath, FILE_NAME));
+      }
+
+      // this is bootstrap load path
+      Set<String> locationsToCopy = new HashSet<>();
+      FileSystem fileSystem = rootReplLoadPath.getFileSystem(hiveConf);
+      FileStatus[] fileStatuses = fileSystem.listStatus(rootReplLoadPath);
+      for (FileStatus next : fileStatuses) {
+        if (next.isDirectory()) {
+          Path externalTableInfoPath = new Path(next.getPath(), FILE_NAME);
+          locationsToCopy.addAll(sourceLocationsToCopy(externalTableInfoPath));
+        }
+      }
+      return locationsToCopy;
+    }
+
+    private BufferedReader reader(FileSystem fs, Path externalTableInfo) throws IOException {
+      InputStreamReader in = new InputStreamReader(fs.open(externalTableInfo), StandardCharsets.UTF_8);
+      return new BufferedReader(in);
+    }
+
+    /**
+     * The SET of source locations should never be created based on the table Name in
+     * {@link #FILE_NAME} since there can be multiple entries for the same table in case the table is
+     * partitioned and the partitions are added by providing a separate Location for that partition,
+     * different than the table location.
+     */
+    private Set<String> sourceLocationsToCopy(Path externalTableInfo) throws IOException {
+      Set<String> locationsToCopy = new HashSet<>();
+      FileSystem fileSystem = externalTableInfo.getFileSystem(hiveConf);
+      if (!fileSystem.exists(externalTableInfo)) {
+        return locationsToCopy;
+      }
+
+      int currentRetry = 0;
+      BufferedReader reader = null;
+      while (currentRetry < MAX_RETRIES) {
+        try {
+          reader = reader(fileSystem, externalTableInfo);
+          for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+            String[] splits = line.split(FIELD_SEPARATOR);
+            locationsToCopy
+                .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8));
+          }
+          return locationsToCopy;
+        } catch (IOException e) {
+          currentRetry++;
+          if (currentRetry < MAX_RETRIES) {
+            closeQuietly(reader);
+            LOG.warn("failed to read {}", externalTableInfo.toString(), e);
+          } else {
+            LOG.error("failed to read {}", externalTableInfo.toString(), e);
+            throw e;
+          }
+        } finally {
+          closeQuietly(reader);
+        }
+      }
+      // we should never reach here
+      throw new IllegalStateException("we should never reach this condition");
+    }
+
+    private static void closeQuietly(BufferedReader reader) {
+      try {
+        if (reader != null) {
+          reader.close();
+        }
+      } catch (IOException e) {
+        LOG.debug("error while closing reader ", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 270670d..2126aab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -228,8 +228,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
           createEndReplLogTask(context, scope, iterator.replLogger());
         }
       }
-      boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
-          || constraintIterator.hasNext();
+
+      if (loadTaskTracker.canAddMoreTasks()) {
+        scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
+      }
+
+      boolean addAnotherLoadTask = iterator.hasNext()
+          || loadTaskTracker.hasReplicationState()
+          || constraintIterator.hasNext()
+          || work.getPathsToCopyIterator().hasNext();
+
       if (addAnotherLoadTask) {
         createBuilderTask(scope.rootTasks);
       }
@@ -300,7 +308,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
   private void partitionsPostProcessing(BootstrapEventsIterator iterator,
       Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
-      TaskTracker partitionsTracker) throws SemanticException {
+      TaskTracker partitionsTracker) {
     setUpDependencies(tableTracker, partitionsTracker);
     if (!scope.database && !scope.table) {
       scope.rootTasks.addAll(partitionsTracker.tasks());
@@ -343,8 +351,43 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
   private int executeIncrementalLoad(DriverContext driverContext) {
     try {
-      IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
-      this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG, work));
+      List<Task<? extends Serializable>> childTasks = new ArrayList<>();
+      int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
+      // during incremental we will have no parallelism from replication tasks since they are event based
+      // and hence are linear. To achieve prallelism we have to use copy tasks(which have no DAG) for
+      // all threads except one, in execution phase.
+      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+      IncrementalLoadTasksBuilder builder = work.getIncrementalLoadTaskBuilder();
+
+      // If the total number of tasks that can be created are less than the parallelism we can achieve
+      // do nothing since someone is working on 1950's machine. else try to achieve max parallelism
+      int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0;
+      if (maxTasks <= parallelism) {
+        if (builder.hasMoreWork()) {
+          calculatedMaxNumOfTasks = maxTasks;
+        } else {
+          maxNumOfHDFSTasks = maxTasks;
+        }
+      } else {
+        calculatedMaxNumOfTasks = maxTasks - parallelism + 1;
+        maxNumOfHDFSTasks = parallelism - 1;
+      }
+      TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks);
+      Task<? extends Serializable> incrementalLoadTaskRoot =
+          builder.build(driverContext, getHive(), LOG, work, trackerForReplIncremental);
+      // we are adding the incremental task first so that its always processed first,
+      // followed by dir copy tasks if capacity allows.
+      childTasks.add(incrementalLoadTaskRoot);
+
+      TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks);
+      childTasks
+          .addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
+
+      // either the incremental has more work or the external table file copy has more paths to process
+      if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext()) {
+        DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
+      }
+      this.childTasks = childTasks;
       return 0;
     } catch (Exception e) {
       LOG.error("failed replication", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index ff21b6a..e86a5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.exec.Task;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
 
 @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
     Explain.Level.DEFAULT,
@@ -37,13 +40,14 @@ public class ReplLoadWork implements Serializable {
   final String dbNameToLoadIn;
   final String tableNameToLoadIn;
   final String dumpDirectory;
-  private final transient BootstrapEventsIterator bootstrapIterator;
   private final ConstraintEventsIterator constraintsIterator;
-  private final transient IncrementalLoadEventsIterator incrementalIterator;
   private int loadTaskRunCount = 0;
   private DatabaseEvent.State state = null;
+  private final transient BootstrapEventsIterator bootstrapIterator;
+  private final transient IncrementalLoadEventsIterator incrementalIterator;
   private final transient IncrementalLoadTasksBuilder incrementalLoad;
   private transient Task<? extends Serializable> rootTask;
+  private final transient Iterator<DirCopyWork> pathsToCopyIterator;
 
   /*
   these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -53,7 +57,8 @@ public class ReplLoadWork implements Serializable {
   final LineageState sessionStateLineageState;
 
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
-      String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException {
+      String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo,
+      List<DirCopyWork> pathsToCopyIterator) throws IOException {
     this.tableNameToLoadIn = tableNameToLoadIn;
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
@@ -63,7 +68,8 @@ public class ReplLoadWork implements Serializable {
       incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf);
       this.bootstrapIterator = null;
       this.constraintsIterator = null;
-      incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
+      incrementalLoad =
+          new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
               incrementalIterator, hiveConf, eventTo);
     } else {
       this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
@@ -71,6 +77,7 @@ public class ReplLoadWork implements Serializable {
       incrementalIterator = null;
       incrementalLoad = null;
     }
+    this.pathsToCopyIterator = pathsToCopyIterator.iterator();
   }
 
   public BootstrapEventsIterator iterator() {
@@ -116,4 +123,8 @@ public class ReplLoadWork implements Serializable {
   public void setRootTask(Task<? extends Serializable> rootTask) {
     this.rootTask = rootTask;
   }
+
+  public Iterator<DirCopyWork> getPathsToCopyIterator() {
+    return pathsToCopyIterator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index ebe0090..60ad6d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 /**
  * Replication layout is from the root directory of replication Dump is
  * db
+ *    _external_tables_info
  *    table1
  *        _metadata
  *        data

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index d909945..e0f8f72 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -120,6 +121,11 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
       if (replicationState == null && next == null) {
         while (remoteIterator.hasNext()) {
           LocatedFileStatus next = remoteIterator.next();
+          // we want to skip this file, this also means there cant be a table with name represented
+          // by constantReplExternalTables.FILE_NAME
+          if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) {
+            continue;
+          }
           if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
             String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
             List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 357c693..599eb04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -72,30 +71,50 @@ public class FSTableEvent implements TableEvent {
     return fromPath;
   }
 
+  /**
+   * To determine if the tableDesc is for an external table,
+   * use {@link ImportTableDesc#isExternal()}
+   * and not {@link ImportTableDesc#tableType()} method.
+   */
   @Override
   public ImportTableDesc tableDesc(String dbName) throws SemanticException {
     try {
       Table table = new Table(metadata.getTable());
+      boolean externalTableOnSource = TableType.EXTERNAL_TABLE.equals(table.getTableType());
       // The table can be non acid in case of replication from 2.6 cluster.
       if (!AcidUtils.isTransactionalTable(table)
               && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES)
               && (table.getTableType() == TableType.MANAGED_TABLE)) {
         Hive hiveDb = Hive.get(hiveConf);
         //TODO : dump metadata should be read to make sure that migration is required.
-        HiveStrictManagedMigration.TableMigrationOption migrationOption
-                = HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
-                table.getTableType(),null, (Configuration)hiveConf,
-                hiveDb.getMSC(),true);
+        HiveStrictManagedMigration.TableMigrationOption migrationOption =
+            HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
+                table.getTableType(), null, hiveConf,
+                hiveDb.getMSC(), true);
         HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(),
                 migrationOption, false,
-                getHiveUpdater(hiveConf), hiveDb.getMSC(), (Configuration)hiveConf);
+                getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf);
         // If the conversion is from non transactional to transactional table
         if (AcidUtils.isTransactionalTable(table)) {
           replicationSpec().setMigratingToTxnTable();
         }
+        if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+          // since we have converted to an external table now after applying the migration rules the
+          // table location has to be set to null so that the location on the target is picked up
+          // based on default configuration
+          table.setDataLocation(null);
+          if(!externalTableOnSource) {
+            replicationSpec().setMigratingToExternalTable();
+          }
+        }
       }
       ImportTableDesc tableDesc
               = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table);
+      if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+        tableDesc.setLocation(
+            table.getDataLocation() == null ? null : table.getDataLocation().toString());
+        tableDesc.setExternal(true);
+      }
       tableDesc.setReplicationSpec(replicationSpec());
       if (table.getTableType() == TableType.EXTERNAL_TABLE) {
         tableDesc.setExternal(true);
@@ -150,8 +169,17 @@ public class FSTableEvent implements TableEvent {
       partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
       partDesc.setBucketCols(partition.getSd().getBucketCols());
       partDesc.setSortCols(partition.getSd().getSortCols());
-      partDesc.setLocation(new Path(fromPath,
-          Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+      if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) {
+        // we have to provide the source location so target location can be derived.
+        partDesc.setLocation(partition.getSd().getLocation());
+      } else {
+        /**
+         * this is required for file listing of all files in a partition for managed table as described in
+         * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator}
+         */
+        partDesc.setLocation(new Path(fromPath,
+            Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+      }
       partsDesc.setReplicationSpec(replicationSpec());
       return partsDesc;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 2e895a8..e182f31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -18,15 +18,16 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
@@ -54,7 +55,6 @@ import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
@@ -83,13 +83,13 @@ public class LoadPartitions {
 
   public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
                         TableEvent event, String dbNameToLoadIn,
-                        TableContext tableContext) throws HiveException, IOException {
+                        TableContext tableContext) throws HiveException {
     this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
   }
 
   public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
                         TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
-                        AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
+                        AddPartitionDesc lastReplicatedPartition) throws HiveException {
     this.tracker = new TaskTracker(limiter);
     this.event = event;
     this.context = context;
@@ -101,26 +101,15 @@ public class LoadPartitions {
     this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
   }
 
-  private String location() throws MetaException, HiveException {
-    Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
-    if (!tableContext.waitOnPrecursor()) {
-      return context.warehouse.getDefaultTablePath(
-          parentDb, tableDesc.getTableName(), tableDesc.isExternal()).toString();
-    } else {
-      Path tablePath = context.warehouse.getDefaultTablePath(
-          tableDesc.getDatabaseName(), tableDesc.getTableName(), tableDesc.isExternal());
-      return context.warehouse.getDnsPath(tablePath).toString();
-    }
-  }
-
   public TaskTracker tasks() throws SemanticException {
     try {
       /*
       We are doing this both in load table and load partitions
        */
-      if (tableDesc.getLocation() == null) {
-        tableDesc.setLocation(location());
-      }
+      Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+      LoadTable.TableLocationTuple tableLocationTuple =
+          LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
+      tableDesc.setLocation(tableLocationTuple.location);
 
       if (table == null) {
         //new table
@@ -157,7 +146,7 @@ public class LoadPartitions {
     }
   }
 
-  private void updateReplicationState(ReplicationState replicationState) throws SemanticException {
+  private void updateReplicationState(ReplicationState replicationState) {
     if (!tracker.canAddMoreTasks()) {
       tracker.setReplicationState(replicationState);
     }
@@ -203,12 +192,26 @@ public class LoadPartitions {
    * returns the root task for adding a partition
    */
   private Task<?> tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
-          throws MetaException, IOException, HiveException {
+          throws MetaException, HiveException {
+    AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
+    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
+    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+        + partSpec.getLocation());
+
     Task<?> addPartTask = TaskFactory.get(
             new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
             context.hiveConf
     );
-    if (event.replicationSpec().isMetadataOnly()) {
+
+    boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly()
+        || (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+        && !event.replicationSpec().isMigratingToExternalTable()
+    );
+
+    if (isOnlyDDLOperation) {
       if (ptnRootTask == null) {
         ptnRootTask = addPartTask;
       } else {
@@ -217,16 +220,7 @@ public class LoadPartitions {
       return ptnRootTask;
     }
 
-    AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
-    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
-    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
-    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
-    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
-            + partSpecToString(partSpec.getPartSpec()) + " with source location: "
-            + partSpec.getLocation());
-
-    Path tmpPath = replicaWarehousePartitionLocation;
-
+    Path stagingDir = replicaWarehousePartitionLocation;
     // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
     LoadFileType loadFileType;
     if (event.replicationSpec().isInReplicationScope() &&
@@ -236,25 +230,27 @@ public class LoadPartitions {
         // Migrating to transactional tables in bootstrap load phase.
         // It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
         // ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
-        tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
+        stagingDir = new Path(stagingDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
       }
     } else {
-      loadFileType = (event.replicationSpec().isReplace() || event.replicationSpec().isMigratingToTxnTable())
-              ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
-      tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
+       loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL :
+          (event.replicationSpec().isMigratingToTxnTable()
+              ? LoadFileType.KEEP_EXISTING
+              : LoadFileType.OVERWRITE_EXISTING);
+      stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
     }
 
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
         sourceWarehousePartitionLocation,
-        tmpPath,
+        stagingDir,
         context.hiveConf
     );
 
     Task<?> movePartitionTask = null;
     if (loadFileType != LoadFileType.IGNORE) {
       // no need to create move task, if file is moved directly to target location.
-      movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType);
+      movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
     }
 
     // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for
@@ -321,9 +317,26 @@ public class LoadPartitions {
     return TaskFactory.get(moveWork, context.hiveConf);
   }
 
+  /**
+   * Since the table level location will be set by taking into account the base directory configuration
+   * for external table, we don't have to do anything specific for partition location since it will always
+   * be a child of the table level location.
+   * Looks like replication does not handle a specific location provided for a partition and the partition
+   * path will always be a child on target.
+   */
+
   private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)
-      throws MetaException, HiveException, IOException {
+      throws MetaException, HiveException {
     String child = Warehouse.makePartPath(partSpec.getPartSpec());
+    if (tableDesc.isExternal()) {
+      if (event.replicationSpec().isMigratingToExternalTable()) {
+        return new Path(tableDesc.getLocation(), child);
+      }
+      String externalLocation =
+          ReplExternalTables.externalTableLocation(context.hiveConf, partSpec.getLocation());
+      return new Path(externalLocation);
+    }
+
     if (tableDesc.getLocation() == null) {
       if (table.getDataLocation() == null) {
         Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 520b410..e0f0979 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -20,15 +20,17 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
@@ -54,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.BitSet;
 import java.util.Collections;
@@ -75,8 +76,7 @@ public class LoadTable {
   private final TableEvent event;
 
   public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
-                   TableContext tableContext, TaskTracker limiter)
-      throws SemanticException, IOException {
+      TableContext tableContext, TaskTracker limiter) {
     this.event = event;
     this.context = context;
     this.replLogger = replLogger;
@@ -128,9 +128,9 @@ public class LoadTable {
           break;
       }
 
-      if (tableDesc.getLocation() == null) {
-        tableDesc.setLocation(location(tableDesc, parentDb));
-      }
+      TableLocationTuple
+          tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context);
+      tableDesc.setLocation(tableLocationTuple.location);
 
   /* Note: In the following section, Metadata-only import handling logic is
      interleaved with regular repl-import logic. The rule of thumb being
@@ -141,7 +141,7 @@ public class LoadTable {
      or in the case of an unpartitioned table. In all other cases, it should
      behave like a noop or a pure MD alter.
   */
-      newTableTasks(tableDesc, tblRootTask);
+      newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
 
       // Set Checkpoint task as dependant to create table task. So, if same dump is retried for
       // bootstrap, we skip current table update.
@@ -173,7 +173,8 @@ public class LoadTable {
     return ReplLoadOpType.LOAD_REPLACE;
   }
 
-  private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask) throws Exception {
+  private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple)
+      throws Exception {
     Table table = tblDesc.toTable(context.hiveConf);
     ReplicationSpec replicationSpec = event.replicationSpec();
     Task<?> createTableTask =
@@ -208,26 +209,58 @@ public class LoadTable {
       parentTask.addDependentTask(replTxnTask);
       parentTask = replTxnTask;
     }
-    if (!isPartitioned(tblDesc)) {
+    boolean shouldCreateLoadTableTask = (
+        !isPartitioned(tblDesc)
+            && !TableType.EXTERNAL_TABLE.equals(table.getTableType())
+    ) || tuple.isConvertedFromManagedToExternal;
+    if (shouldCreateLoadTableTask) {
       LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
-      Task<?> loadTableTask =
-          loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
+      Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
               event.metadataPath());
       parentTask.addDependentTask(loadTableTask);
     }
     tracker.addTask(tblRootTask);
   }
 
-  private String location(ImportTableDesc tblDesc, Database parentDb)
-      throws MetaException, SemanticException {
-    if (!tableContext.waitOnPrecursor()) {
-      return context.warehouse.getDefaultTablePath(
-          parentDb, tblDesc.getTableName(), tblDesc.isExternal()).toString();
+  static class TableLocationTuple {
+    final String location;
+    private final boolean isConvertedFromManagedToExternal;
+
+    TableLocationTuple(String location, boolean isConvertedFromManagedToExternal) {
+      this.location = location;
+      this.isConvertedFromManagedToExternal = isConvertedFromManagedToExternal;
+    }
+  }
+
+  static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parentDb,
+      TableContext tableContext, Context context) throws MetaException, SemanticException {
+    Warehouse wh = context.warehouse;
+    Path defaultTablePath;
+    if (parentDb == null) {
+      defaultTablePath = wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(),
+          tblDesc.isExternal());
     } else {
-      Path tablePath = context.warehouse.getDefaultTablePath(
-          tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal());
-      return context.warehouse.getDnsPath(tablePath).toString();
+      defaultTablePath = wh.getDefaultTablePath(
+          parentDb, tblDesc.getTableName(), tblDesc.isExternal()
+      );
+    }
+    // dont use TableType.EXTERNAL_TABLE.equals(tblDesc.tableType()) since this comes in as managed always for tables.
+    if (tblDesc.isExternal()) {
+      if (tblDesc.getLocation() == null) {
+        // this is the use case when the table got converted to external table as part of migration
+        // related rules to be applied to replicated tables across different versions of hive.
+        return new TableLocationTuple(wh.getDnsPath(defaultTablePath).toString(), true);
+      }
+      String currentLocation = new Path(tblDesc.getLocation()).toUri().getPath();
+      String newLocation =
+          ReplExternalTables.externalTableLocation(context.hiveConf, currentLocation);
+      LOG.debug("external table {} data location is: {}", tblDesc.getTableName(), newLocation);
+      return new TableLocationTuple(newLocation, false);
     }
+    Path path = tableContext.waitOnPrecursor()
+        ? wh.getDnsPath(defaultTablePath)
+        : wh.getDefaultTablePath(parentDb, tblDesc.getTableName(), tblDesc.isExternal());
+    return new TableLocationTuple(path.toString(), false);
   }
 
   private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index ae6411d..7ae33e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -92,14 +92,13 @@ public class IncrementalLoadTasksBuilder {
   }
 
   public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log,
-                                            ReplLoadWork loadWork) throws Exception {
+      ReplLoadWork loadWork, TaskTracker tracker) throws Exception {
     Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
     Task<? extends Serializable> taskChainTail = evTaskRoot;
     Long lastReplayedEvent = null;
     this.log = log;
     numIteration++;
     this.log.debug("Iteration num " + numIteration);
-    TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
 
     while (iterator.hasNext() && tracker.canAddMoreTasks()) {
       FileStatus dir = iterator.next();
@@ -153,10 +152,7 @@ public class IncrementalLoadTasksBuilder {
       lastReplayedEvent = eventDmd.getEventTo();
     }
 
-    if (iterator.hasNext()) {
-      // add load task to start the next iteration
-      taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
-    } else {
+    if (!hasMoreWork()) {
       // if no events were replayed, then add a task to update the last repl id of the database/table to last event id.
       if (taskChainTail == evTaskRoot) {
         String lastEventid = eventTo.toString();
@@ -177,10 +173,17 @@ public class IncrementalLoadTasksBuilder {
       this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
               taskChainTail.getClass(), taskChainTail.getId(),
               barrierTask.getClass(), barrierTask.getId());
+      if (loadWork.getPathsToCopyIterator().hasNext()) {
+        taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
+      }
     }
     return evTaskRoot;
   }
 
+  public boolean hasMoreWork() {
+    return iterator.hasNext();
+  }
+
   private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
     if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
       String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 014192b..4fdd12a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -108,7 +108,8 @@ public class ReplUtils {
 
   public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
           throws SemanticException {
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+    TableType tableType = tableDesc.isExternal() ? TableType.EXTERNAL_TABLE : tableDesc.tableType();
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableType);
     return TaskFactory.get(replLogWork, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 26f21cf..89b2db3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -561,7 +561,7 @@ public class Table implements Serializable {
 
   public void setDataLocation(Path path) {
     this.path = path;
-    tTable.getSd().setLocation(path.toString());
+    tTable.getSd().setLocation(path == null ? null : path.toString());
   }
 
   public void unsetDataLocation() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 833757c..fb31254 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.text.ParseException;
 import java.util.ArrayList;
@@ -118,7 +119,7 @@ public abstract class BaseSemanticAnalyzer {
   protected final Hive db;
   protected final HiveConf conf;
   protected final QueryState queryState;
-  protected List<Task<?>> rootTasks;
+  protected List<Task<? extends Serializable>> rootTasks;
   protected FetchTask fetchTask;
   protected final Logger LOG;
   protected final LogHelper console;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 9c78108..a843987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -67,8 +69,8 @@ import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.thrift.TException;
-import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -92,6 +94,8 @@ import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveU
  */
 public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ImportSemanticAnalyzer.class);
+
   public ImportSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
@@ -283,7 +287,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
         if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
+          replicationSpec.setMigratingToExternalTable();
           tblDesc.setExternal(true);
+          // we should set this to null so default location for external tables is chosen on target
+          tblDesc.setLocation(null);
         }
       } else {
         tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
@@ -308,6 +315,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     if (isLocationSet) {
+      STATIC_LOG.debug("table {} location is {}", tblDesc.getTableName(), parsedLocation);
       tblDesc.setLocation(parsedLocation);
       x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
     }
@@ -316,11 +324,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       tblDesc.setTableName(parsedTableName);
     }
 
-    List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+    List<AddPartitionDesc> partitionDescs = new ArrayList<>();
     Iterable<Partition> partitions = rv.getPartitions();
     for (Partition partition : partitions) {
       // TODO: this should ideally not create AddPartitionDesc per partition
-      AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+      AddPartitionDesc partsDesc =
+          getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
+              replicationSpec, x.getConf());
       if (inReplicationScope){
         StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
       }
@@ -387,24 +397,25 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
-    if (!inReplicationScope) {
+    if (inReplicationScope) {
+      createReplImportTasks(
+          tblDesc, partitionDescs,
+          replicationSpec, waitOnPrecursor, table,
+          fromURI, wh, x, writeId, stmtId, updatedMetadata);
+    } else {
       createRegularImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, table,
           fromURI, fs, wh, x, writeId, stmtId);
-    } else {
-      createReplImportTasks(
-          tblDesc, partitionDescs,
-          replicationSpec, waitOnPrecursor, table,
-          fromURI, fs, wh, x, writeId, stmtId, updatedMetadata);
     }
     return tableExists;
   }
 
   private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
-      Path fromPath, String dbname,
-      ImportTableDesc tblDesc, Partition partition) throws MetaException, SemanticException {
-    AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
+      Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition,
+      ReplicationSpec replicationSpec, HiveConf conf)
+      throws MetaException, SemanticException {
+    AddPartitionDesc partsDesc = new AddPartitionDesc(dbName, tblDesc.getTableName(),
         EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
         partition.getSd().getLocation(), partition.getParameters());
     AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0);
@@ -416,16 +427,23 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
     partDesc.setBucketCols(partition.getSd().getBucketCols());
     partDesc.setSortCols(partition.getSd().getSortCols());
-    partDesc.setLocation(new Path(fromPath,
-        Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+    if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()
+        && !replicationSpec.isMigratingToExternalTable()) {
+      String newLocation = ReplExternalTables
+          .externalTableLocation(conf, partition.getSd().getLocation());
+      LOG.debug("partition {} has data location: {}", partition, newLocation);
+      partDesc.setLocation(newLocation);
+    } else {
+      partDesc.setLocation(new Path(fromPath,
+          Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+    }
     return partsDesc;
   }
 
   private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName,
       org.apache.hadoop.hive.metastore.api.Table tblObj) throws Exception {
     Table table = new Table(tblObj);
-    ImportTableDesc tblDesc = new ImportTableDesc(dbName, table);
-    return tblDesc;
+    return new ImportTableDesc(dbName, table);
   }
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
@@ -543,8 +561,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private static Task<? extends Serializable> alterSinglePartition(
-      URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
-      Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
+      ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
       ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
       EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException {
     addPartitionDesc.setReplaceMode(true);
@@ -553,14 +570,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
     if (ptn == null) {
-      fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
-    } else {
+      fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
+    } else if (!externalTablePartition(tblDesc, replicationSpec)) {
       partSpec.setLocation(ptn.getLocation()); // use existing location
     }
     return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
   }
 
-  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
+  private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
       EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
       throws MetaException, IOException, HiveException {
@@ -579,7 +596,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       return addPartTask;
     } else {
       String srcLocation = partSpec.getLocation();
-      fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+      fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
       x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
           + partSpecToString(partSpec.getPartSpec())
           + " with source location: " + srcLocation);
@@ -604,7 +621,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
                 LoadFileType.REPLACE_ALL :
                 replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
         //Replication scope the write id will be invalid
-        Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
+        boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
                 replicationSpec.isInReplicationScope();
         destPath =  useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
                 : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
@@ -683,12 +700,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   /**
    * Helper method to set location properly in partSpec
    */
-  private static void fixLocationInPartSpec(
-      FileSystem fs, ImportTableDesc tblDesc, Table table,
-      Warehouse wh, ReplicationSpec replicationSpec,
-      AddPartitionDesc.OnePartitionDesc partSpec,
+  private static void fixLocationInPartSpec(ImportTableDesc tblDesc, Table table,
+      Warehouse wh, ReplicationSpec replicationSpec, AddPartitionDesc.OnePartitionDesc partSpec,
       EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException {
-    Path tgtPath = null;
+    if (externalTablePartition(tblDesc, replicationSpec)) {
+      /*
+        we use isExternal and not tableType() method since that always gives type as managed table.
+        we don't do anything since for external table partitions the path is already set correctly
+        in {@link org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler}
+       */
+      if (replicationSpec.isMigratingToExternalTable()) {
+        // at this point the table.getDataLocation() should be set already for external tables
+        // using the correct values of default warehouse external table location on target.
+        partSpec.setLocation(new Path(tblDesc.getLocation(),
+            Warehouse.makePartPath(partSpec.getPartSpec())).toString());
+        LOG.debug("partition spec {} has location set to {} for a table migrating to external table"
+                + " from managed table",
+            StringUtils.join(partSpec.getPartSpec().entrySet(), ","),
+            partSpec.getLocation()
+        );
+      }
+      return;
+    }
+    Path tgtPath;
     if (tblDesc.getLocation() == null) {
       if (table.getDataLocation() != null) {
         tgtPath = new Path(table.getDataLocation().toString(),
@@ -708,6 +742,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     partSpec.setLocation(tgtPath.toString());
   }
 
+  private static boolean externalTablePartition(ImportTableDesc tblDesc,
+      ReplicationSpec replicationSpec) {
+    return (replicationSpec != null) && replicationSpec.isInReplicationScope()
+        && tblDesc.isExternal();
+  }
+
   public static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec,
       Logger logger)
       throws IOException, SemanticException {
@@ -976,7 +1016,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+                tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -1006,7 +1046,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if (isPartitioned(tblDesc)) {
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-          t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
+          t.addDependentTask(addSinglePartition(tblDesc, table, wh, addPartitionDesc,
             replicationSpec, x, writeId, stmtId));
         }
       } else {
@@ -1056,10 +1096,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       ImportTableDesc tblDesc,
       List<AddPartitionDesc> partitionDescs,
       ReplicationSpec replicationSpec, boolean waitOnPrecursor,
-      Table table, URI fromURI, FileSystem fs, Warehouse wh,
+      Table table, URI fromURI, Warehouse wh,
       EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId,
       UpdatedMetaDataTracker updatedMetadata)
-      throws HiveException, URISyntaxException, IOException, MetaException {
+      throws HiveException, IOException, MetaException {
 
     Task<?> dropTblTask = null;
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
@@ -1121,7 +1161,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         tblDesc.setLocation(
             wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal())
         ).toString());
-
       }
     }
 
@@ -1147,7 +1186,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+                addSinglePartition(tblDesc, table, wh, addPartitionDesc, replicationSpec, x,
+                    writeId, stmtId));
             if (updatedMetadata != null) {
               updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
                       addPartitionDesc.getPartition(0).getPartSpec());
@@ -1200,14 +1240,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           if (ptn == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
-                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+                  tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
               if (updatedMetadata != null) {
                 updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
                         addPartitionDesc.getPartition(0).getPartSpec());
               }
             } else {
               x.getTasks().add(alterSinglePartition(
-                      fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
+                  tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
               if (updatedMetadata != null) {
                 updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
                         addPartitionDesc.getPartition(0).getPartSpec());
@@ -1219,10 +1259,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             if (replicationSpec.allowReplacementInto(ptn.getParameters())){
               if (!replicationSpec.isMetadataOnly()){
                 x.getTasks().add(addSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+                    tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
               } else {
                 x.getTasks().add(alterSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
+                    tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
               }
               if (updatedMetadata != null) {
                 updatedMetadata.addPartition(table.getDbName(), table.getTableName(),

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 1ebbb82..4e7595c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -37,17 +36,21 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.net.URI;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
@@ -371,15 +374,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       DumpMetaData dmd = new DumpMetaData(loadPath, conf);
 
       boolean evDump = false;
-      if (dmd.isIncrementalDump()){
+      // we will decide what hdfs locations needs to be copied over here as well.
+      if (dmd.isIncrementalDump()) {
         LOG.debug("{} contains an incremental dump", loadPath);
         evDump = true;
       } else {
         LOG.debug("{} contains an bootstrap dump", loadPath);
       }
-
       ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
-              tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo());
+          tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo(),
+          dirLocationsToCopy(loadPath, evDump));
       rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
@@ -387,6 +391,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
+  private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase)
+      throws HiveException, IOException {
+    List<DirCopyWork> list = new ArrayList<>();
+    String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
+    // this is done to remove any scheme related information that will be present in the base path
+    // specifically when we are replicating to cloud storage
+    Path basePath = new Path(baseDir);
+
+    for (String location : new Reader(conf, loadPath, isIncrementalPhase).sourceLocationsToCopy()) {
+      Path sourcePath = new Path(location);
+      String targetPathWithoutSchemeAndAuth = basePath.toUri().getPath() + sourcePath.toUri().getPath();
+      Path fullyQualifiedTargetUri = PathBuilder.fullyQualifiedHDFSUri(
+          new Path(targetPathWithoutSchemeAndAuth),
+          basePath.getFileSystem(conf)
+      );
+      list.add(new DirCopyWork(sourcePath, fullyQualifiedTargetUri));
+    }
+    return list;
+  }
+
   private void setConfigs(ASTNode node) throws SemanticException {
     Map<String, String> replConfigs = DDLSemanticAnalyzer.getProps(node);
     if (null != replConfigs) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 39009ce..b087831 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -48,6 +48,7 @@ public class ReplicationSpec {
   private String validTxnList = null;
   private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
   private boolean isMigratingToTxnTable = false;
+  private boolean isMigratingToExternalTable = false;
 
   // Key definitions related to replication
   public enum KEY {
@@ -410,4 +411,12 @@ public class ReplicationSpec {
   public void setMigratingToTxnTable() {
     isMigratingToTxnTable = true;
   }
+
+  public boolean isMigratingToExternalTable() {
+    return isMigratingToExternalTable;
+  }
+
+  public void setMigratingToExternalTable() {
+    isMigratingToExternalTable = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index fe0eaf8..686fe7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -49,7 +49,7 @@ public class CopyUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
   // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
-  private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
+  public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
   private static final int MAX_COPY_RETRY = 5;
 
   private final HiveConf hiveConf;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 83a9642..21df63c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -182,12 +182,13 @@ public class Utils {
     }
 
     if (replicationSpec.isInReplicationScope()) {
-      if (!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
-              MetaStoreUtils.isExternalTable(tableHandle.getTTable()) && !replicationSpec.isMetadataOnly()) {
+      if (tableHandle.isTemporary()) {
         return false;
       }
 
-      return !tableHandle.isTemporary();
+      if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) {
+        return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) || replicationSpec.isMetadataOnly();
+      }
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
index 7d17de2..f8a9ace 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -18,12 +18,11 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
 import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
+
 class DropTableHandler extends AbstractEventHandler<DropTableMessage> {
 
   DropTableHandler(NotificationEvent event) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 842e20a..f029fee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -53,6 +54,9 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
       return;
     }
     org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(eventMessage);
+    if (TableType.EXTERNAL_TABLE.equals(qlMdTable.getTableType())) {
+      withinContext.replicationSpec.setNoop(true);
+    }
 
     if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index ecde3ce..ecd4c84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -58,10 +58,6 @@ public class PartitionSerializer implements JsonWriter.Serializer {
                   ReplicationSpec.KEY.CURR_STATE_ID.toString(),
                   additionalPropertiesProvider.getCurrentReplicationState());
         }
-        if (isPartitionExternal()) {
-          // Replication destination will not be external
-          partition.putToParameters("EXTERNAL", "FALSE");
-        }
       }
       writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8));
       writer.jsonGenerator.flush();