You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/27 08:19:00 UTC
[hive] branch master updated: HIVE-23835:Repl Dump should dump
function binaries to staging directory( Pravin Kumar Sinha,
reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e34acf5 HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi)
e34acf5 is described below
commit e34acf5c677a23af0053ac98532a9caa9e190b6c
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Jul 27 13:48:47 2020 +0530
HIVE-23835:Repl Dump should dump function binaries to staging directory( Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/ql/parse/TestReplicationScenarios.java | 4 +-
.../TestReplicationScenariosAcrossInstances.java | 78 ++++++++++++++++++++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 1 +
.../org/apache/hadoop/hive/ql/exec/CopyTask.java | 2 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 27 +++++---
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 32 ++++++++-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 1 +
.../org/apache/hadoop/hive/ql/parse/EximUtil.java | 10 +--
.../hive/ql/parse/repl/dump/PartitionExport.java | 8 +--
.../hive/ql/parse/repl/dump/TableExport.java | 4 +-
.../repl/dump/events/CreateFunctionHandler.java | 42 +++++++++++-
.../ql/parse/repl/dump/io/FunctionSerializer.java | 23 ++++++-
.../repl/load/message/CreateFunctionHandler.java | 16 +++--
.../org/apache/hadoop/hive/ql/plan/CopyWork.java | 15 +++++
.../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 5 +-
.../queries/clientpositive/repl_2_exim_basic.q | 1 +
17 files changed, 232 insertions(+), 39 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2a32d89..d623311 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -542,7 +542,7 @@ public class HiveConf extends Configuration {
"Indicates whether replication dump can skip copyTask and refer to \n"
+ " original path instead. This would retain all table and partition meta"),
REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE("hive.repl.dump.metadata.only.for.external.table",
- false,
+ true,
"Indicates whether external table replication dump only metadata information or data + metadata"),
REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false,
"Indicates if repl dump should bootstrap the information about ACID tables along with \n"
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 66b0d07..b8e91dd 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -395,10 +395,10 @@ public class TestReplicationScenarios {
String replicatedDbName = dbName + "_dupe";
- EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true);
+ EximUtil.DataCopyPath.setNullSrcPath(hconf, true);
verifyFail("REPL DUMP " + dbName, driver);
advanceDumpDir();
- EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false);
+ EximUtil.DataCopyPath.setNullSrcPath(hconf, false);
Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName);
advanceDumpDir();
FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 60074ae..2953c22 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -116,6 +116,79 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
}
@Test
+ public void testCreateFunctionOnHDFSIncrementalReplication() throws Throwable {
+ Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+ Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+ Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
+ setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+ setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
+
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".idFunc1 as 'IdentityStringUDF' "
+ + "using jar '" + identityUdf1HdfsPath.toString() + "'");
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(bootStrapDump.lastReplicationId)
+ .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+ .verifyResults(new String[] { replicatedDbName + ".idFunc1"})
+ .run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
+ .verifyResults(new String[] { "MyName"});
+
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".idFunc2 as 'IdentityStringUDF' "
+ + "using jar '" + identityUdf2HdfsPath.toString() + "'");
+
+ WarehouseInstance.Tuple incrementalDump =
+ primary.dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(incrementalDump.lastReplicationId)
+ .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+ .verifyResults(new String[] { replicatedDbName + ".idFunc1",
+ replicatedDbName + ".idFunc2" })
+ .run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
+ .verifyResults(new String[] { "YourName"});
+ }
+
+ @Test
+ public void testCreateFunctionOnHDFSIncrementalReplicationLazyCopy() throws Throwable {
+ Path identityUdfLocalPath = new Path("../../data/files/identity_udf.jar");
+ Path identityUdf1HdfsPath = new Path(primary.functionsRoot, "idFunc1" + File.separator + "identity_udf1.jar");
+ Path identityUdf2HdfsPath = new Path(primary.functionsRoot, "idFunc2" + File.separator + "identity_udf2.jar");
+ setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf1HdfsPath);
+ setupUDFJarOnHDFS(identityUdfLocalPath, identityUdf2HdfsPath);
+ List<String> withClasuse = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".idFunc1 as 'IdentityStringUDF' "
+ + "using jar '" + identityUdf1HdfsPath.toString() + "'");
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, withClasuse);
+ replica.load(replicatedDbName, primaryDbName, withClasuse)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(bootStrapDump.lastReplicationId)
+ .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+ .verifyResults(new String[] { replicatedDbName + ".idFunc1"})
+ .run("SELECT " + replicatedDbName + ".idFunc1('MyName')")
+ .verifyResults(new String[] { "MyName"});
+
+ primary.run("CREATE FUNCTION " + primaryDbName
+ + ".idFunc2 as 'IdentityStringUDF' "
+ + "using jar '" + identityUdf2HdfsPath.toString() + "'");
+
+ WarehouseInstance.Tuple incrementalDump =
+ primary.dump(primaryDbName, withClasuse);
+ replica.load(replicatedDbName, primaryDbName, withClasuse)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(incrementalDump.lastReplicationId)
+ .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
+ .verifyResults(new String[] { replicatedDbName + ".idFunc1",
+ replicatedDbName + ".idFunc2" })
+ .run("SELECT " + replicatedDbName + ".idFunc2('YourName')")
+ .verifyResults(new String[] { "YourName"});
+ }
+
+ @Test
public void testBootstrapReplLoadRetryAfterFailureForFunctions() throws Throwable {
String funcName1 = "f1";
String funcName2 = "f2";
@@ -1685,4 +1758,9 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
private String quote(String str) {
return "'" + str + "'";
}
+
+ private void setupUDFJarOnHDFS(Path identityUdfLocalPath, Path identityUdfHdfsPath) throws IOException {
+ FileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.copyFromLocalFile(identityUdfLocalPath, identityUdfHdfsPath);
+ }
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 0a7d5a0..1bef351 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -141,6 +141,7 @@ public class WarehouseInstance implements Closeable {
hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot);
hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot);
+ hiveConf.setBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE, false);
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
"jdbc:derby:memory:${test.tmp.dir}/APP;create=true");
hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index c11f582..5ffc110 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -91,7 +91,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable {
Utilities.FILE_OP_LOGGER.debug("Copying file {} to {}", oneSrcPathStr, toPath);
if (!FileUtils.copy(srcFs, oneSrc.getPath(), dstFs, toPath,
false, // delete source
- true, // overwrite destination
+ work.isOverwrite(), // overwrite destination
conf)) {
console.printError("Failed to copy: '" + oneSrcPathStr
+ "to: '" + toPath.toString() + "'");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index bb0ae1f..402c87e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -154,7 +154,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
public int execute() {
try {
SecurityUtils.reloginExpiringKeytabUser();
- if (work.tableDataCopyIteratorsInitialized()) {
+ if (work.dataCopyIteratorsInitialized()) {
initiateDataCopyTasks();
} else {
Path dumpRoot = getEncodedDumpRootPath();
@@ -178,6 +178,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
ReplChangeManager.getInstance(conf);
Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
+ LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY));
if (isBootstrap) {
lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive());
} else {
@@ -253,6 +254,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf));
+ childTasks.addAll(work.functionsBinariesCopyTasks(taskTracker, conf));
if (childTasks.isEmpty()) {
//All table data copy work finished.
finishRemainingTasks();
@@ -792,6 +794,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
//We can't reuse the previous write id as it might be invalid due to compaction
metadataPath.getFileSystem(conf).delete(metadataPath, true);
}
+ List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = Collections.emptyList();
int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize);
FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) {
@@ -818,12 +821,12 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
work.getMetricCollector().reportStageStart(getName(), metricMap);
Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
- dumpFunctionMetadata(dbName, dbRoot, hiveDb);
+ boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+ functionsBinaryCopyPaths = dumpFunctionMetadata(dbName, dbRoot, dbDataRoot, hiveDb, dataCopyAtLoad);
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
Exception caught = null;
try (Writer writer = new Writer(dbRoot, conf)) {
- List<Path> extTableLocations = new LinkedList<>();
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
Table table = null;
@@ -836,7 +839,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
LOG.debug("Adding table {} to external tables list", tblName);
writer.dataLocationDump(tableTuple.object, extTableFileList, conf);
}
- boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
bootDumpBeginReplId,
hiveDb, tableTuple, managedTblList, dataCopyAtLoad);
@@ -878,6 +880,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId);
dmd.write(true);
+ work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator());
setDataCopyIterators(extTableFileList, managedTblList);
return bootDumpBeginReplId;
}
@@ -1108,24 +1111,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return null;
}
- void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception {
- Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+ List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot,
+ Hive hiveDb, boolean copyAtLoad) throws Exception {
+ List<EximUtil.DataCopyPath> functionsBinaryCopyPaths = new ArrayList<>();
+ Path functionsMetaRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+ Path functionsDataRoot = new Path(dbDataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
List<String> functionNames = hiveDb.getFunctions(dbName, "*");
for (String functionName : functionNames) {
HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
if (tuple == null) {
continue;
}
- Path functionRoot = new Path(functionsRoot, functionName);
- Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME);
+ Path functionMetaRoot = new Path(functionsMetaRoot, functionName);
+ Path functionMetadataFile = new Path(functionMetaRoot, FUNCTION_METADATA_FILE_NAME);
+ Path functionDataRoot = new Path(functionsDataRoot, functionName);
try (JsonWriter jsonWriter =
new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) {
- FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
+ FunctionSerializer serializer = new FunctionSerializer(tuple.object, functionDataRoot, copyAtLoad, conf);
serializer.writeTo(jsonWriter, tuple.replicationSpec);
+ functionsBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths());
}
work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1);
replLogger.functionLog(functionName);
}
+ return functionsBinaryCopyPaths;
}
void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiveDb) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 21caf44..64b9dd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -57,6 +57,7 @@ public class ReplDumpWork implements Serializable {
private Integer maxEventLimit;
private transient Iterator<String> externalTblCopyPathIterator;
private transient Iterator<String> managedTblCopyPathIterator;
+ private transient Iterator<EximUtil.DataCopyPath> functionCopyPathIterator;
private Path currentDumpPath;
private List<String> resultValues;
private boolean shouldOverwrite;
@@ -147,8 +148,17 @@ public class ReplDumpWork implements Serializable {
this.managedTblCopyPathIterator = managedTblCopyPathIterator;
}
- public boolean tableDataCopyIteratorsInitialized() {
- return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null;
+ public void setFunctionCopyPathIterator(Iterator<EximUtil.DataCopyPath> functionCopyPathIterator) {
+ if (this.functionCopyPathIterator != null) {
+ throw new IllegalStateException("Function copy path iterator has already been initialized");
+ }
+ this.functionCopyPathIterator = functionCopyPathIterator;
+ }
+
+ public boolean dataCopyIteratorsInitialized() {
+ return externalTblCopyPathIterator != null
+ || managedTblCopyPathIterator != null
+ || functionCopyPathIterator != null;
}
public Path getCurrentDumpPath() {
@@ -192,7 +202,7 @@ public class ReplDumpWork implements Serializable {
ReplicationSpec replSpec = new ReplicationSpec();
replSpec.setIsReplace(true);
replSpec.setInReplicationScope(true);
- EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec);
+ EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec);
managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
@@ -204,6 +214,22 @@ public class ReplDumpWork implements Serializable {
return tasks;
}
+ public List<Task<?>> functionsBinariesCopyTasks(TaskTracker tracker, HiveConf conf) {
+ List<Task<?>> tasks = new ArrayList<>();
+ if (functionCopyPathIterator != null) {
+ while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
+ EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next();
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf
+ );
+ tasks.add(copyTask);
+ tracker.addTask(copyTask);
+ LOG.debug("added task for {}", binaryCopyPath);
+ }
+ }
+ return tasks;
+ }
+
public void setShouldOverwrite(boolean shouldOverwrite) {
this.shouldOverwrite = shouldOverwrite;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 5ac9a05..3c3dc44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -119,6 +119,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
if (shouldLoadAuthorizationMetadata()) {
initiateAuthorizationLoadTask();
}
+ LOG.info("Data copy at load enabled : {}", conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY));
if (work.isIncrementalLoad()) {
return executeIncrementalLoad();
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index b35f7ab..7d39f8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -162,20 +162,20 @@ public class EximUtil {
}
/**
- * Wrapper class for mapping source and target path for copying managed table data.
+ * Wrapper class for mapping source and target path for copying managed table data and function's binary.
*/
- public static class ManagedTableCopyPath implements StringConvertibleObject {
+ public static class DataCopyPath implements StringConvertibleObject {
private static final String URI_SEPARATOR = "#";
private ReplicationSpec replicationSpec;
private static boolean nullSrcPathForTest = false;
private Path srcPath;
private Path tgtPath;
- public ManagedTableCopyPath(ReplicationSpec replicationSpec) {
+ public DataCopyPath(ReplicationSpec replicationSpec) {
this.replicationSpec = replicationSpec;
}
- public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
+ public DataCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
this.replicationSpec = replicationSpec;
if (srcPath == null) {
throw new IllegalArgumentException("Source path can not be null.");
@@ -200,7 +200,7 @@ public class EximUtil {
@Override
public String toString() {
- return "ManagedTableCopyPath{"
+ return "DataCopyPath{"
+ "fullyQualifiedSourcePath=" + srcPath
+ ", fullyQualifiedTargetPath=" + tgtPath
+ '}';
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 65d4fbf..aad34d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
@@ -75,11 +75,11 @@ class PartitionExport {
this.callersSession = SessionState.get();
}
- List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask,
+ List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask,
FileList fileList, boolean dataCopyAtLoad)
throws InterruptedException, HiveException {
List<Future<?>> futures = new LinkedList<>();
- List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>();
+ List<DataCopyPath> managedTableCopyPaths = new LinkedList<>();
ExecutorService producer = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build());
futures.add(producer.submit(() -> {
@@ -126,7 +126,7 @@ class PartitionExport {
Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName);
LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
if (!(isExportTask || dataCopyAtLoad)) {
- fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
+ fileList.add(new DataCopyPath(forReplicationSpec, partition.getDataLocation(),
dataDumpDir).convertToString());
}
} catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 66cf494..1465b8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
@@ -174,7 +174,7 @@ public class TableExport {
List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
replicationSpec, conf);
if (!(isExportTask || dataCopyAtLoad)) {
- fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
+ fileList.add(new DataCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
paths.dataExportDir()).convertToString());
}
new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
index c9e1041..69671b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java
@@ -20,13 +20,25 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage;
+import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.EximUtil.DataCopyPath;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage> {
CreateFunctionHandler(NotificationEvent event) {
super(event);
@@ -41,13 +53,37 @@ class CreateFunctionHandler extends AbstractEventHandler<CreateFunctionMessage>
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} CREATE_FUNCTION message : {}", fromEventId(), eventMessageAsJSON);
Path metadataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf);
-
+ boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+ List<DataCopyPath> functionBinaryCopyPaths = new ArrayList<>();
try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) {
- new FunctionSerializer(eventMessage.getFunctionObj(), withinContext.hiveConf)
- .writeTo(jsonWriter, withinContext.replicationSpec);
+ FunctionSerializer serializer = new FunctionSerializer(eventMessage.getFunctionObj(),
+ dataPath, copyAtLoad, withinContext.hiveConf);
+ serializer.writeTo(jsonWriter, withinContext.replicationSpec);
+ functionBinaryCopyPaths.addAll(serializer.getFunctionBinaryCopyPaths());
}
withinContext.createDmd(this).write();
+ copyFunctionBinaries(functionBinaryCopyPaths, withinContext.hiveConf);
+ }
+
+ private void copyFunctionBinaries(List<DataCopyPath> functionBinaryCopyPaths, HiveConf hiveConf)
+ throws MetaException, IOException, LoginException, HiveFatalException {
+ if (!functionBinaryCopyPaths.isEmpty()) {
+ String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+ List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>();
+ for (DataCopyPath funcBinCopyPath : functionBinaryCopyPaths) {
+ String [] decodedURISplits = ReplChangeManager.decodeFileUri(funcBinCopyPath.getSrcPath().toString());
+ ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]),
+ decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf);
+ filePaths.add(fileInfo);
+ Path destRoot = funcBinCopyPath.getTargetPath().getParent();
+ FileSystem dstFs = destRoot.getFileSystem(hiveConf);
+ CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs);
+ copyUtils.copyAndVerify(destRoot, filePaths, funcBinCopyPath.getSrcPath(), false);
+ copyUtils.renameFileCopiedFromCmPath(destRoot, dstFs, filePaths);
+ }
+ }
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index 733bab5..2e87267 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
@@ -40,10 +41,15 @@ public class FunctionSerializer implements JsonWriter.Serializer {
public static final String FIELD_NAME = "function";
private Function function;
private HiveConf hiveConf;
+ private Path functionDataRoot;
+ private boolean copyAtLoad;
+ private List<EximUtil.DataCopyPath> functionBinaryCopyPaths = new ArrayList<>();
- public FunctionSerializer(Function function, HiveConf hiveConf) {
+ public FunctionSerializer(Function function, Path functionDataRoot, boolean copyAtLoad, HiveConf hiveConf) {
this.hiveConf = hiveConf;
this.function = function;
+ this.functionDataRoot = functionDataRoot;
+ this.copyAtLoad = copyAtLoad;
}
@Override
@@ -58,9 +64,16 @@ public class FunctionSerializer implements JsonWriter.Serializer {
FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem);
String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem);
- String newFileUri = ReplChangeManager.getInstance(hiveConf)
+ String encodedSrcUri = ReplChangeManager.getInstance(hiveConf)
.encodeFileUri(qualifiedUri.toString(), checkSum, null);
- resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri));
+ if (copyAtLoad) {
+ resourceUris.add(new ResourceUri(uri.getResourceType(), encodedSrcUri));
+ } else {
+ Path newBinaryPath = new Path(functionDataRoot, qualifiedUri.getName());
+ resourceUris.add(new ResourceUri(uri.getResourceType(),newBinaryPath.toString()));
+ functionBinaryCopyPaths.add(new EximUtil.DataCopyPath(additionalPropertiesProvider,
+ new Path(encodedSrcUri), newBinaryPath));
+ }
} else {
resourceUris.add(uri);
}
@@ -84,4 +97,8 @@ public class FunctionSerializer implements JsonWriter.Serializer {
throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e);
}
}
+
+ public List<EximUtil.DataCopyPath> getFunctionBinaryCopyPaths() {
+ return functionBinaryCopyPaths;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index 948d201..f42290e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import java.io.IOException;
@@ -193,15 +194,20 @@ public class CreateFunctionHandler extends AbstractMessageHandler {
new Path(functionsRootDir).getFileSystem(context.hiveConf)
);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
- metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath,
- context.hiveConf
- );
- replCopyTasks.add(copyTask);
+ replCopyTasks.add(getCopyTask(sourceUri, qualifiedDestinationPath));
ResourceUri destinationUri =
new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString());
context.log.debug("copy source uri : {} to destination uri: {}", sourceUri, destinationUri);
return destinationUri;
}
+
+ private Task<?> getCopyTask(String sourceUri, Path dest) {
+ boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+ if (copyAtLoad ) {
+ return ReplCopyTask.getLoadCopyTask(metadata.getReplicationSpec(), new Path(sourceUri), dest, context.hiveConf);
+ } else {
+ return TaskFactory.get(new CopyWork(new Path(sourceUri), dest, true, false), context.hiveConf);
+ }
+ }
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
index 018983f..f69776a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java
@@ -33,6 +33,7 @@ public class CopyWork implements Serializable {
private Path[] fromPath;
private Path[] toPath;
private boolean errorOnSrcEmpty;
+ private boolean overwrite = true;
public CopyWork() {
}
@@ -42,6 +43,12 @@ public class CopyWork implements Serializable {
this.setErrorOnSrcEmpty(errorOnSrcEmpty);
}
+ public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty, boolean overwrite) {
+ this(new Path[] { fromPath }, new Path[] { toPath });
+ this.setErrorOnSrcEmpty(errorOnSrcEmpty);
+ this.setOverwrite(overwrite);
+ }
+
public CopyWork(final Path[] fromPath, final Path[] toPath) {
if (fromPath.length != toPath.length) {
throw new RuntimeException(
@@ -87,4 +94,12 @@ public class CopyWork implements Serializable {
public boolean isErrorOnSrcEmpty() {
return errorOnSrcEmpty;
}
+
+ public boolean isOverwrite() {
+ return overwrite;
+ }
+
+ public void setOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index a6def15..3f47678 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -87,7 +88,9 @@ public class TestReplDumpTask {
}
@Override
- void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) {
+ List<EximUtil.DataCopyPath> dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot,
+ Hive hiveDb, boolean copyAtLoad) {
+ return Collections.emptyList();
}
@Override
diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
index b9119a9..4c18223 100644
--- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
+++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
@@ -3,6 +3,7 @@ set hive.test.mode=true;
set hive.test.mode.prefix=;
set hive.test.mode.nosamplelist=managed_t,ext_t,managed_t_imported,managed_t_r_imported,ext_t_imported,ext_t_r_imported;
set hive.repl.include.external.tables=true;
+set hive.repl.dump.metadata.only.for.external.table=false;
drop table if exists managed_t;
drop table if exists ext_t;