You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/27 08:19:00 UTC

[hive] branch master updated: HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e34acf5  HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi)
e34acf5 is described below

commit e34acf5c677a23af0053ac98532a9caa9e190b6c
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Jul 27 13:48:47 2020 +0530

    HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  2 +-
 .../hive/ql/parse/TestReplicationScenarios.java    |  4 +-
 .../TestReplicationScenariosAcrossInstances.java   | 78 ++++++++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |  1 +
 .../org/apache/hadoop/hive/ql/exec/CopyTask.java   |  2 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 27 +++++---
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     | 32 ++++++++-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  1 +
 .../org/apache/hadoop/hive/ql/parse/EximUtil.java  | 10 +--
 .../hive/ql/parse/repl/dump/PartitionExport.java   |  8 +--
 .../hive/ql/parse/repl/dump/TableExport.java       |  4 +-
 .../repl/dump/events/CreateFunctionHandler.java    | 42 +++++++++++-
 .../ql/parse/repl/dump/io/FunctionSerializer.java  | 23 ++++++-
 .../repl/load/message/CreateFunctionHandler.java   | 16 +++--
 .../org/apache/hadoop/hive/ql/plan/CopyWork.java   | 15 +++++
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |  5 +-
 .../queries/clientpositive/repl_2_exim_basic.q     |  1 +
 17 files changed, 232 insertions(+), 39 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2a32d89..d623311 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -542,7 +542,7 @@ public class HiveConf extends Configuration {
         "Indicates whether replication dump can skip copyTask and refer to  \n"
             + " original path instead. This would retain all table and partition meta"),
     REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table",
-            false,
+            true,
             "Indicates whether external table replication dump only metadata information or data + metadata"),
     REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false,
         "Indicates if repl dump should bootstrap the information about ACID tables along with \n"
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 66b0d07..b8e91dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -395,10 +395,10 @@ public class TestReplicationScenarios {
     String replicatedDbName = dbName + "_dupe";
 
 
-    EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true);
+    EximUtil.DataCopyPath.setNullSrcPath(hconf, true);
     verifyFail("REPL DUMP " + dbName, driver);
     advanceDumpDir();
-    EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false);
+    EximUtil.DataCopyPath.setNullSrcPath(hconf, false);
     Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName);
     advanceDumpDir();
     FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 60074ae..2953c22 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -116,6 +116,79 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   }
 
   @Test
+  public void testCreateFunctionOnHDFSIncrementalReplication() throws Throwable {
+    Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+    Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+    Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
+    setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+    setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
+
+    primary.run("CREATE FUNCTION " + primaryDbName
+            + ".idFunc1 as 'IdentityStringUDF' "
+            + "using jar  '" + identityUdf1HdfsPath.toString() + "'");
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId)
+            .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+            .verifyResults(new String[] { replicatedDbName + ".idFunc1"})
+            .run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
+            .verifyResults(new String[] { "MyName"});
+
+    primary.run("CREATE FUNCTION " + primaryDbName
+            + ".idFunc2 as 'IdentityStringUDF' "
+            + "using jar  '" + identityUdf2HdfsPath.toString() + "'");
+
+    WarehouseInstance.Tuple incrementalDump =
+            primary.dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+            .verifyResults(new String[] { replicatedDbName + ".idFunc1",
+                    replicatedDbName + ".idFunc2" })
+            .run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
+            .verifyResults(new String[] { "YourName"});
+  }
+
+  @Test
+  public void testCreateFunctionOnHDFSIncrementalReplicationLazyCopy() throws Throwable {
+    Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+    Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+    Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
+    setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+    setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
+    List<String> withClasuse = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+
+    primary.run("CREATE FUNCTION " + primaryDbName
+            + ".idFunc1 as 'IdentityStringUDF' "
+            + "using jar  '" + identityUdf1HdfsPath.toString() + "'");
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, withClasuse);
+    replica.load(replicatedDbName, primaryDbName, withClasuse)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId)
+            .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+            .verifyResults(new String[] { replicatedDbName + ".idFunc1"})
+            .run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
+            .verifyResults(new String[] { "MyName"});
+
+    primary.run("CREATE FUNCTION " + primaryDbName
+            + ".idFunc2 as 'IdentityStringUDF' "
+            + "using jar  '" + identityUdf2HdfsPath.toString() + "'");
+
+    WarehouseInstance.Tuple incrementalDump =
+            primary.dump(primaryDbName, withClasuse);
+    replica.load(replicatedDbName, primaryDbName, withClasuse)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+            .verifyResults(new String[] { replicatedDbName + ".idFunc1",
+                    replicatedDbName + ".idFunc2" })
+            .run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
+            .verifyResults(new String[] { "YourName"});
+  }
+
+  @Test
   public void testBootstrapReplLoadRetryAfterFailureForFunctions() throws Throwable {
     String funcName1 = "f1";
     String funcName2 = "f2";
@@ -1685,4 +1758,9 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   private String quote(String str) {
     return "'" + str + "'";
   }
+
+  private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPath) throws IOException {
+    FileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
+  }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 0a7d5a0..1bef351 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -141,6 +141,7 @@ public class WarehouseInstance implements Closeable {
     hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot);
     hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot);
+    hiveConf.setBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE, false);
     hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
         "jdbc:derby:memory:${test.tmp.dir}/APP;create=true");
     hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index c11f582..5ffc110 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -91,7 +91,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
         Utilities.FILE_OP_LOGGER.debug("Copying file {} to {}", oneSrcPathStr, toPath);
         if (!FileUtils.copy(srcFs, oneSrc.getPath(), dstFs, toPath,
             false, // delete source
-            true, // overwrite destination
+            work.isOverwrite(), // overwrite destination
             conf)) {
           console.printError("Failed to copy: '" + oneSrcPathStr
               + "to: '" + toPath.toString() + "'");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index bb0ae1f..402c87e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -154,7 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   public int execute() {
     try {
       SecurityUtils.reloginExpiringKeytabUser();
-      if (work.tableDataCopyIteratorsInitialized()) {
+      if (work.dataCopyIteratorsInitialized()) {
         initiateDataCopyTasks();
       } else {
         Path dumpRoot = getEncodedDumpRootPath();
@@ -178,6 +178,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           ReplChangeManager.getInstance(conf);
           Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
           Long lastReplId;
+          LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY));
           if (isBootstrap) {
             lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive());
           } else {
@@ -253,6 +254,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
     childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
     childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf));
+    childTasks.addAll(work.functionsBinariesCopyTasks(taskTracker, conf));
     if (childTasks.isEmpty()) {
       //All table data copy work finished.
       finishRemainingTasks();
@@ -792,6 +794,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       //We can't reuse the previous write id as it might be invalid due to compaction
       metadataPath.getFileSystem(conf).delete(metadataPath, true);
     }
+    List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = Collections.emptyList();
     int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
     try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize);
          FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) {
@@ -818,12 +821,12 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         work.getMetricCollector().reportStageStart(getName(), metricMap);
         Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
         Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
-        dumpFunctionMetadata(dbName, dbRoot, hiveDb);
+        boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+        functionsBinaryCopyPaths = dumpFunctionMetadata(dbName, dbRoot, dbDataRoot, hiveDb, dataCopyAtLoad);
 
         String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
         Exception caught = null;
         try (Writer writer = new Writer(dbRoot, conf)) {
-          List<Path> extTableLocations = new LinkedList<>();
           for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
             LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
             Table table = null;
@@ -836,7 +839,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
                 LOG.debug("Adding table {} to external tables list", tblName);
                 writer.dataLocationDump(tableTuple.object, extTableFileList, conf);
               }
-              boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
               dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
                       bootDumpBeginReplId,
                       hiveDb, tableTuple, managedTblList, dataCopyAtLoad);
@@ -878,6 +880,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
       dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId);
       dmd.write(true);
+      work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator());
       setDataCopyIterators(extTableFileList, managedTblList);
       return bootDumpBeginReplId;
     }
@@ -1108,24 +1111,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return null;
   }
 
-  void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception {
-    Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+  List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot,
+                                                             Hive hiveDb, boolean copyAtLoad) throws Exception {
+    List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = new ArrayList<>();
+    Path functionsMetaRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+    Path functionsDataRoot = new Path(dbDataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
     List<String> functionNames = hiveDb.getFunctions(dbName, "*");
     for (String functionName : functionNames) {
       HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
       if (tuple == null) {
         continue;
       }
-      Path functionRoot = new Path(functionsRoot, functionName);
-      Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME);
+      Path functionMetaRoot = new Path(functionsMetaRoot, functionName);
+      Path functionMetadataFile = new Path(functionMetaRoot, FUNCTION_METADATA_FILE_NAME);
+      Path functionDataRoot = new Path(functionsDataRoot, functionName);
       try (JsonWriter jsonWriter =
           new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) {
-        FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
+        FunctionSerializer serializer = new FunctionSerializer(tuple.object, functionDataRoot, copyAtLoad, conf);
         serializer.writeTo(jsonWriter, tuple.replicationSpec);
+        functionsBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths());
       }
       work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1);
       replLogger.functionLog(functionName);
     }
+    return functionsBinaryCopyPaths;
   }
 
   void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 21caf44..64b9dd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -57,6 +57,7 @@ public class ReplDumpWork implements Serializable {
   private Integer maxEventLimit;
   private transient Iterator<String> externalTblCopyPathIterator;
   private transient Iterator<String> managedTblCopyPathIterator;
+  private transient Iterator<EximUtil.DataCopyPath>  functionCopyPathIterator;
   private Path currentDumpPath;
   private List<String> resultValues;
   private boolean shouldOverwrite;
@@ -147,8 +148,17 @@ public class ReplDumpWork implements Serializable {
     this.managedTblCopyPathIterator = managedTblCopyPathIterator;
   }
 
-  public boolean tableDataCopyIteratorsInitialized() {
-    return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null;
+  public void setFunctionCopyPathIterator(Iterator<EximUtil.DataCopyPath> functionCopyPathIterator) {
+    if (this.functionCopyPathIterator != null) {
+      throw new IllegalStateException("Function copy path iterator has already been initialized");
+    }
+    this.functionCopyPathIterator = functionCopyPathIterator;
+  }
+
+  public boolean dataCopyIteratorsInitialized() {
+    return externalTblCopyPathIterator != null
+            || managedTblCopyPathIterator != null
+            || functionCopyPathIterator != null;
   }
 
   public Path getCurrentDumpPath() {
@@ -192,7 +202,7 @@ public class ReplDumpWork implements Serializable {
       ReplicationSpec replSpec = new ReplicationSpec();
       replSpec.setIsReplace(true);
       replSpec.setInReplicationScope(true);
-      EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec);
+      EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec);
       managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
       Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
               managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
@@ -204,6 +214,22 @@ public class ReplDumpWork implements Serializable {
     return tasks;
   }
 
+  public List<Task<?>> functionsBinariesCopyTasks(TaskTracker tracker, HiveConf conf) {
+    List<Task<?>> tasks = new ArrayList<>();
+    if (functionCopyPathIterator != null) {
+      while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
+        EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next();
+        Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+                binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf
+        );
+        tasks.add(copyTask);
+        tracker.addTask(copyTask);
+        LOG.debug("added task for {}", binaryCopyPath);
+      }
+    }
+    return tasks;
+  }
+
   public void setShouldOverwrite(boolean shouldOverwrite) {
     this.shouldOverwrite = shouldOverwrite;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 5ac9a05..3c3dc44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -119,6 +119,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       if (shouldLoadAuthorizationMetadata()) {
         initiateAuthorizationLoadTask();
       }
+      LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY));
       if (work.isIncrementalLoad()) {
         return executeIncrementalLoad();
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index b35f7ab..7d39f8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -162,20 +162,20 @@ public class EximUtil {
   }
 
   /**
-   * Wrapper class for mapping source and target path for copying managed table data.
+   * Wrapper class for mapping source and target path for copying managed table data and function's binary.
    */
-  public static class ManagedTableCopyPath implements StringConvertibleObject {
+  public static class DataCopyPath implements StringConvertibleObject {
     private static final String URI_SEPARATOR = "#";
     private ReplicationSpec replicationSpec;
     private static boolean nullSrcPathForTest = false;
     private Path srcPath;
     private Path tgtPath;
 
-    public ManagedTableCopyPath(ReplicationSpec replicationSpec) {
+    public DataCopyPath(ReplicationSpec replicationSpec) {
       this.replicationSpec = replicationSpec;
     }
 
-    public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
+    public DataCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
       this.replicationSpec = replicationSpec;
       if (srcPath == null) {
         throw new IllegalArgumentException("Source path can not be null.");
@@ -200,7 +200,7 @@ public class EximUtil {
 
     @Override
     public String toString() {
-      return "ManagedTableCopyPath{"
+      return "DataCopyPath{"
               + "fullyQualifiedSourcePath=" + srcPath
               + ", fullyQualifiedTargetPath=" + tgtPath
               + '}';
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 65d4fbf..aad34d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
@@ -75,11 +75,11 @@ class PartitionExport {
     this.callersSession = SessionState.get();
   }
 
-  List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask,
+  List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask,
                                    FileList fileList, boolean dataCopyAtLoad)
           throws InterruptedException, HiveException {
     List<Future<?>> futures = new LinkedList<>();
-    List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>();
+    List<DataCopyPath> managedTableCopyPaths = new LinkedList<>();
     ExecutorService producer = Executors.newFixedThreadPool(1,
         new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build());
     futures.add(producer.submit(() -> {
@@ -126,7 +126,7 @@ class PartitionExport {
           Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
           if (!(isExportTask || dataCopyAtLoad)) {
-            fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
+            fileList.add(new DataCopyPath(forReplicationSpec, partition.getDataLocation(),
                     dataDumpDir).convertToString());
           }
         } catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 66cf494..1465b8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
@@ -174,7 +174,7 @@ public class TableExport {
         List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
                 replicationSpec, conf);
         if (!(isExportTask || dataCopyAtLoad)) {
-          fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
+          fileList.add(new DataCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
                   paths.dataExportDir()).convertToString());
         }
         new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index c9e1041..69671b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -20,13 +20,25 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> {
   CreateFunctionHandler(NotificationEvent event) {
     super(event);
@@ -41,13 +53,37 @@ class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage>
   public void handle(Context withinContext) throws Exception {
     LOG.info("Processing#{} CREATE_FUNCTION message : {}", fromEventId(), eventMessageAsJSON);
     Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+    Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
     FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
-
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    List<DataCopyPath> functionBinaryCopyPaths = new ArrayList<>();
     try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
-      new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf)
-          .writeTo(jsonWriter, withinContext.replicationSpec);
+      FunctionSerializer serializer = new FunctionSerializer(eventMessage.getFunctionObj(),
+              dataPath, copyAtLoad, withinContext.hiveConf);
+      serializer.writeTo(jsonWriter, withinContext.replicationSpec);
+      functionBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths());
     }
     withinContext.createDmd(this).write();
+    copyFunctionBinaries(functionBinaryCopyPaths, withinContext.hiveConf);
+  }
+
+  private void copyFunctionBinaries(List<DataCopyPath> functionBinaryCopyPaths, HiveConf hiveConf)
+          throws MetaException, IOException, LoginException, HiveFatalException {
+    if (!functionBinaryCopyPaths.isEmpty()) {
+      String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+      List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+      for (DataCopyPath funcBinCopyPath : functionBinaryCopyPaths) {
+        String [] decodedURISplits = ReplChangeManager.decodeFileUri(funcBinCopyPath.getSrcPath().toString());
+        ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
+                decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf);
+        filePaths.add(fileInfo);
+        Path destRoot = funcBinCopyPath.getTargetPath().getParent();
+        FileSystem dstFs = destRoot.getFileSystem(hiveConf);
+        CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
+        copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), false);
+        copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths);
+      }
+    }
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index 733bab5..2e87267 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
@@ -40,10 +41,15 @@ public class FunctionSerializer implements JsonWriter.Serializer {
   public static final String FIELD_NAME = "function";
   private Function function;
   private HiveConf hiveConf;
+  private Path functionDataRoot;
+  private boolean copyAtLoad;
+  private List<EximUtil.DataCopyPath> functionBinaryCopyPaths = new ArrayList<>();
 
-  public FunctionSerializer(Function function, HiveConf hiveConf) {
+  public FunctionSerializer(Function function, Path functionDataRoot, boolean copyAtLoad, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
     this.function = function;
+    this.functionDataRoot = functionDataRoot;
+    this.copyAtLoad = copyAtLoad;
   }
 
   @Override
@@ -58,9 +64,16 @@ public class FunctionSerializer implements JsonWriter.Serializer {
           FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
           Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem);
           String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem);
-          String newFileUri = ReplChangeManager.getInstance(hiveConf)
+          String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
                   .encodeFileUri(qualifiedUri.toString(), checkSum, null);
-          resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri));
+          if (copyAtLoad) {
+            resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
+          } else {
+            Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
+            resourceUris.add(new ResourceUri(uri.getResourceType(),newBinaryPath.toString()));
+            functionBinaryCopyPaths.add(new EximUtil.DataCopyPath(additionalPropertiesProvider,
+                    new Path(encodedSrcUri), newBinaryPath));
+          }
         } else {
           resourceUris.add(uri);
         }
@@ -84,4 +97,8 @@ public class FunctionSerializer implements JsonWriter.Serializer {
       throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
     }
   }
+
+  public List<EximUtil.DataCopyPath> getFunctionBinaryCopyPaths() {
+    return functionBinaryCopyPaths;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index 948d201..f42290e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 
 import java.io.IOException;
@@ -193,15 +194,20 @@ public class CreateFunctionHandler extends AbstractMessageHandler {
           new Path(functionsRootDir).getFileSystem(context.hiveConf)
       );
 
-      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
-          metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath,
-          context.hiveConf
-      );
-      replCopyTasks.add(copyTask);
+      replCopyTasks.add(getCopyTask(sourceUri, qualifiedDestinationPath));
       ResourceUri destinationUri =
           new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString());
       context.log.debug("copy source uri : {} to destination uri: {}", sourceUri, destinationUri);
       return destinationUri;
     }
+
+    private Task<?> getCopyTask(String sourceUri, Path dest) {
+      boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+      if (copyAtLoad ) {
+        return ReplCopyTask.getLoadCopyTask(metadata.getReplicationSpec(), new Path(sourceUri), dest, context.hiveConf);
+      } else {
+        return TaskFactory.get(new CopyWork(new Path(sourceUri), dest, true, false), context.hiveConf);
+      }
+    }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 018983f..f69776a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -33,6 +33,7 @@ public class CopyWork implements Serializable {
   private Path[] fromPath;
   private Path[] toPath;
   private boolean errorOnSrcEmpty;
+  private boolean overwrite = true;
 
   public CopyWork() {
   }
@@ -42,6 +43,12 @@ public class CopyWork implements Serializable {
     this.setErrorOnSrcEmpty(errorOnSrcEmpty);
   }
 
+  public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty, boolean overwrite) {
+    this(new Path[] { fromPath }, new Path[] { toPath });
+    this.setErrorOnSrcEmpty(errorOnSrcEmpty);
+    this.setOverwrite(overwrite);
+  }
+
   public CopyWork(final Path[] fromPath, final Path[] toPath) {
     if (fromPath.length != toPath.length) {
       throw new RuntimeException(
@@ -87,4 +94,12 @@ public class CopyWork implements Serializable {
   public boolean isErrorOnSrcEmpty() {
     return errorOnSrcEmpty;
   }
+
+  public boolean isOverwrite() {
+    return overwrite;
+  }
+
+  public void setOverwrite(boolean overwrite) {
+    this.overwrite = overwrite;
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index a6def15..3f47678 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -87,7 +88,9 @@ public class TestReplDumpTask {
     }
 
     @Override
-    void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) {
+    List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot,
+                                                     Hive hiveDb, boolean copyAtLoad) {
+      return Collections.emptyList();
     }
 
     @Override
diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
index b9119a9..4c18223 100644
--- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
+++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
@@ -3,6 +3,7 @@ set hive.test.mode=true;
 set hive.test.mode.prefix=;
 set hive.test.mode.nosamplelist=managed_t,ext_t,managed_t_imported,managed_t_r_imported,ext_t_imported,ext_t_r_imported;
 set hive.repl.include.external.tables=true;
+set hive.repl.dump.metadata.only.for.external.table=false;
 
 drop table if exists managed_t;
 drop table if exists ext_t;