You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/08/08 18:43:28 UTC
[2/2] hive git commit: HIVE-16896: move replication load related work
in semantic analysis phase to execution phase using a task (Anishek Agarwal,
reviewed by Sankar Hariappan, Daniel Dai)
HIVE-16896: move replication load related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/92f764e0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/92f764e0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/92f764e0
Branch: refs/heads/master
Commit: 92f764e056cc87b8c5ea82b5372a9cbbf804ec33
Parents: 844ec34
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Aug 8 11:43:00 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Aug 8 11:43:00 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
...TestReplicationScenariosAcrossInstances.java | 53 +++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 66 +++--
ql/if/queryplan.thrift | 3 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +-
.../hadoop/hive/ql/plan/api/StageType.java | 7 +-
ql/src/gen/thrift/gen-php/Types.php | 6 +-
ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 9 +-
ql/src/gen/thrift/gen-rb/queryplan_types.rb | 7 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 2 +-
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 +
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 13 +-
.../ql/exec/repl/bootstrap/ReplLoadTask.java | 264 +++++++++++++++++
.../ql/exec/repl/bootstrap/ReplLoadWork.java | 71 +++++
.../repl/bootstrap/events/BootstrapEvent.java | 28 ++
.../repl/bootstrap/events/DatabaseEvent.java | 34 +++
.../repl/bootstrap/events/FunctionEvent.java | 33 +++
.../repl/bootstrap/events/PartitionEvent.java | 26 ++
.../exec/repl/bootstrap/events/TableEvent.java | 42 +++
.../filesystem/BootstrapEventsIterator.java | 133 +++++++++
.../filesystem/DatabaseEventsIterator.java | 141 +++++++++
.../events/filesystem/FSDatabaseEvent.java | 88 ++++++
.../events/filesystem/FSFunctionEvent.java | 39 +++
.../events/filesystem/FSPartitionEvent.java | 84 ++++++
.../events/filesystem/FSTableEvent.java | 123 ++++++++
.../exec/repl/bootstrap/load/LoadDatabase.java | 129 +++++++++
.../exec/repl/bootstrap/load/LoadFunction.java | 73 +++++
.../repl/bootstrap/load/ReplicationState.java | 58 ++++
.../exec/repl/bootstrap/load/TaskTracker.java | 113 ++++++++
.../bootstrap/load/table/LoadPartitions.java | 283 +++++++++++++++++++
.../repl/bootstrap/load/table/LoadTable.java | 216 ++++++++++++++
.../repl/bootstrap/load/table/TableContext.java | 49 ++++
.../exec/repl/bootstrap/load/util/Context.java | 37 +++
.../repl/bootstrap/load/util/PathUtils.java | 105 +++++++
.../hive/ql/parse/ImportSemanticAnalyzer.java | 28 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 216 +-------------
.../hadoop/hive/ql/plan/ImportTableDesc.java | 12 +-
.../repl/bootstrap/load/TaskTrackerTest.java | 29 ++
.../repl_load_requires_admin.q.out | 4 -
40 files changed, 2356 insertions(+), 290 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c16880e..7cee344 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.conf;
import com.google.common.base.Joiner;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
@@ -37,7 +36,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Shell;
import org.apache.hive.common.HiveCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -442,6 +440,11 @@ public class HiveConf extends Configuration {
"Inteval for cmroot cleanup thread."),
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
"Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"),
+ REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000,
+ "Provide and approximate of the max number of tasks that should be executed in before \n" +
+ "dynamically generating the next set of tasks. The number is an approximate as we \n" +
+ "will stop at slightly higher number than above, the reason being some events might \n" +
+ "lead to an task increment that would cross the above limit"),
REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5,
"Number of threads that will be used to dump partition data information during repl dump."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index c431537..2af728f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
import org.junit.AfterClass;
@@ -89,7 +90,7 @@ public class TestReplicationScenariosAcrossInstances {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
replica.load(replicatedDbName, bootStrapDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(bootStrapDump.lastReplicationId);
+ .verifyResult(bootStrapDump.lastReplicationId);
primary.run("CREATE FUNCTION " + primaryDbName
+ ".testFunction as 'hivemall.tools.string.StopwordUDF' "
@@ -99,16 +100,16 @@ public class TestReplicationScenariosAcrossInstances {
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(incrementalDump.lastReplicationId)
+ .verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
- .verify(replicatedDbName + ".testFunction");
+ .verifyResult(replicatedDbName + ".testFunction");
// Test the idempotent behavior of CREATE FUNCTION
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(incrementalDump.lastReplicationId)
+ .verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
- .verify(replicatedDbName + ".testFunction");
+ .verifyResult(replicatedDbName + ".testFunction");
}
@Test
@@ -119,7 +120,7 @@ public class TestReplicationScenariosAcrossInstances {
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
replica.load(replicatedDbName, bootStrapDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(bootStrapDump.lastReplicationId);
+ .verifyResult(bootStrapDump.lastReplicationId);
primary.run("Drop FUNCTION " + primaryDbName + ".testFunction ");
@@ -127,16 +128,16 @@ public class TestReplicationScenariosAcrossInstances {
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(incrementalDump.lastReplicationId)
+ .verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '*testfunction*'")
- .verify(null);
+ .verifyResult(null);
// Test the idempotent behavior of DROP FUNCTION
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verify(incrementalDump.lastReplicationId)
+ .verifyResult(incrementalDump.lastReplicationId)
.run("SHOW FUNCTIONS LIKE '*testfunction*'")
- .verify(null);
+ .verifyResult(null);
}
@Test
@@ -148,7 +149,7 @@ public class TestReplicationScenariosAcrossInstances {
replica.load(replicatedDbName, bootStrapDump.dumpLocation)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
- .verify(replicatedDbName + ".testFunction");
+ .verifyResult(replicatedDbName + ".testFunction");
}
@Test
@@ -164,7 +165,7 @@ public class TestReplicationScenariosAcrossInstances {
replica.load(replicatedDbName, tuple.dumpLocation)
.run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'")
- .verify(replicatedDbName + ".anotherFunction");
+ .verifyResult(replicatedDbName + ".anotherFunction");
FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus(
new Path(
@@ -218,4 +219,32 @@ public class TestReplicationScenariosAcrossInstances {
}).collect(Collectors.toList());
return new Dependencies(collect);
}
+
+ /*
+ From the hive logs(hive.log) we can also check for the info statement
+ fgrep "Total Tasks" [location of hive.log]
+ each line indicates one run of loadTask.
+ */
+ @Test
+ public void testMultipleStagesOfReplicationLoadTask() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("create table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='india') values ('mumbai')")
+ .run("insert into table t2 partition(country='india') values ('delhi')")
+ .run("create table t3 (rank int)")
+ .dump(primaryDbName, null);
+
+ // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
+ replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1);
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] { "t1", "t2", "t3" })
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 8dfab08..c084d4d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -56,7 +56,7 @@ class WarehouseInstance implements Closeable {
final String functionsRoot;
private Logger logger;
private Driver driver;
- private HiveConf hiveConf;
+ HiveConf hiveConf;
MiniDFSCluster miniDFSCluster;
private HiveMetaStoreClient client;
@@ -71,16 +71,18 @@ class WarehouseInstance implements Closeable {
assert miniDFSCluster.isDataNodeUp();
DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+ Path warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
- initialize(cmRootPath.toString(), hiveInTests);
+ initialize(cmRootPath.toString(), warehouseRoot.toString(), hiveInTests);
}
WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception {
this(logger, cluster, true);
}
- private void initialize(String cmRoot, boolean hiveInTest) throws Exception {
+ private void initialize(String cmRoot, String warehouseRoot, boolean hiveInTest)
+ throws Exception {
hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
@@ -95,6 +97,7 @@ class WarehouseInstance implements Closeable {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest);
// turn on db notification listener on meta store
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot);
hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS);
hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -181,38 +184,39 @@ class WarehouseInstance implements Closeable {
return this;
}
- WarehouseInstance verify(String data) throws IOException {
- return verifyResults(data == null ? new String[] {} : new String[] { data });
- }
+ WarehouseInstance verifyResult (String data) throws IOException {
+ verifyResults(data == null ? new String[] {} : new String[] { data });
+ return this;
+ }
- /**
- * All the results that are read from the hive output will not preserve
- * case sensitivity and will all be in lower case, hence we will check against
- * only lower case data values.
- * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
- * before assert.
- */
- WarehouseInstance verifyResults(String[] data) throws IOException {
- List<String> results = getOutput();
- logger.info("Expecting {}", StringUtils.join(data, ","));
- logger.info("Got {}", results);
- assertEquals(data.length, results.size());
- for (int i = 0; i < data.length; i++) {
- assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
+ /**
+ * All the results that are read from the hive output will not preserve
+ * case sensitivity and will all be in lower case, hence we will check against
+ * only lower case data values.
+ * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case
+ * before assert.
+ */
+ WarehouseInstance verifyResults(String[] data) throws IOException {
+ List<String> results = getOutput();
+ logger.info("Expecting {}", StringUtils.join(data, ","));
+ logger.info("Got {}", results);
+ assertEquals(data.length, results.size());
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase());
+ }
+ return this;
}
- return this;
- }
- List<String> getOutput() throws IOException {
- List<String> results = new ArrayList<>();
- try {
- driver.getResults(results);
- } catch (CommandNeedRetryException e) {
- logger.warn(e.getMessage(), e);
- throw new RuntimeException(e);
+ List<String> getOutput() throws IOException {
+ List<String> results = new ArrayList<>();
+ try {
+ driver.getResults(results);
+ } catch (CommandNeedRetryException e) {
+ logger.warn(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ return results;
}
- return results;
- }
private void printOutput() throws IOException {
for (String s : getOutput()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index dc55805..00b0200 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -100,7 +100,8 @@ enum StageType {
STATS,
DEPENDENCY_COLLECTION,
COLUMNSTATS,
- REPLDUMP,
+ REPL_DUMP,
+ REPL_BOOTSTRAP_LOAD,
}
struct Stage {
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 7254d50..f467da2 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -116,7 +116,8 @@ int _kStageTypeValues[] = {
StageType::STATS,
StageType::DEPENDENCY_COLLECTION,
StageType::COLUMNSTATS,
- StageType::REPLDUMP
+ StageType::REPL_DUMP,
+ StageType::REPL_BOOTSTRAP_LOAD
};
const char* _kStageTypeNames[] = {
"CONDITIONAL",
@@ -131,9 +132,10 @@ const char* _kStageTypeNames[] = {
"STATS",
"DEPENDENCY_COLLECTION",
"COLUMNSTATS",
- "REPLDUMP"
+ "REPL_DUMP",
+ "REPL_BOOTSTRAP_LOAD"
};
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
Adjacency::~Adjacency() throw() {
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index 38d054b..ac87ef7 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -94,7 +94,8 @@ struct StageType {
STATS = 9,
DEPENDENCY_COLLECTION = 10,
COLUMNSTATS = 11,
- REPLDUMP = 12
+ REPL_DUMP = 12,
+ REPL_BOOTSTRAP_LOAD = 13
};
};
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index deca574..11a8f6d 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -24,7 +24,8 @@ public enum StageType implements org.apache.thrift.TEnum {
STATS(9),
DEPENDENCY_COLLECTION(10),
COLUMNSTATS(11),
- REPLDUMP(12);
+ REPL_DUMP(12),
+ REPL_BOOTSTRAP_LOAD(13);
private final int value;
@@ -70,7 +71,9 @@ public enum StageType implements org.apache.thrift.TEnum {
case 11:
return COLUMNSTATS;
case 12:
- return REPLDUMP;
+ return REPL_DUMP;
+ case 13:
+ return REPL_BOOTSTRAP_LOAD;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index 4d902ee..68edfcd 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -114,7 +114,8 @@ final class StageType {
const STATS = 9;
const DEPENDENCY_COLLECTION = 10;
const COLUMNSTATS = 11;
- const REPLDUMP = 12;
+ const REPL_DUMP = 12;
+ const REPL_BOOTSTRAP_LOAD = 13;
static public $__names = array(
0 => 'CONDITIONAL',
1 => 'COPY',
@@ -128,7 +129,8 @@ final class StageType {
9 => 'STATS',
10 => 'DEPENDENCY_COLLECTION',
11 => 'COLUMNSTATS',
- 12 => 'REPLDUMP',
+ 12 => 'REPL_DUMP',
+ 13 => 'REPL_BOOTSTRAP_LOAD',
);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 9e29129..6bf65af 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -160,7 +160,8 @@ class StageType:
STATS = 9
DEPENDENCY_COLLECTION = 10
COLUMNSTATS = 11
- REPLDUMP = 12
+ REPL_DUMP = 12
+ REPL_BOOTSTRAP_LOAD = 13
_VALUES_TO_NAMES = {
0: "CONDITIONAL",
@@ -175,7 +176,8 @@ class StageType:
9: "STATS",
10: "DEPENDENCY_COLLECTION",
11: "COLUMNSTATS",
- 12: "REPLDUMP",
+ 12: "REPL_DUMP",
+ 13: "REPL_BOOTSTRAP_LOAD",
}
_NAMES_TO_VALUES = {
@@ -191,7 +193,8 @@ class StageType:
"STATS": 9,
"DEPENDENCY_COLLECTION": 10,
"COLUMNSTATS": 11,
- "REPLDUMP": 12,
+ "REPL_DUMP": 12,
+ "REPL_BOOTSTRAP_LOAD": 13,
}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 1433d4a..2730dde 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -72,9 +72,10 @@ module StageType
STATS = 9
DEPENDENCY_COLLECTION = 10
COLUMNSTATS = 11
- REPLDUMP = 12
- VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"}
- VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze
+ REPL_DUMP = 12
+ REPL_BOOTSTRAP_LOAD = 13
+ VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD"}
+ VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD]).freeze
end
class Adjacency
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index fdcf052..9183edf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -561,7 +561,7 @@ public class Context {
private static final String MR_PREFIX = "-mr-";
- private static final String EXT_PREFIX = "-ext-";
+ public static final String EXT_PREFIX = "-ext-";
private static final String LOCAL_PREFIX = "-local-";
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 94d6c5a..91ac4bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
@@ -111,6 +113,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
+ taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
}
private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
@@ -149,6 +152,7 @@ public final class TaskFactory {
throw new RuntimeException("No task for work class " + workClass.getName());
}
+ @SafeVarargs
public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
Task<? extends Serializable>... tasklist) {
Task<T> ret = get((Class<T>) work.getClass(), conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 7501ed7..34b6737 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -61,7 +61,7 @@ import java.util.List;
public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
- private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+ private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
@@ -86,6 +86,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
} catch (Exception e) {
LOG.error("failed", e);
+ setException(e);
return 1;
}
return 0;
@@ -262,10 +263,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// make it easy to write .q unit tests, instead of unique id generation.
// however, this does mean that in writing tests, we have to be aware that
// repl dump will clash with prior dumps, and thus have to clean up properly.
- if (work.testInjectDumpDir == null) {
+ if (ReplDumpWork.testInjectDumpDir == null) {
return "next";
} else {
- return work.testInjectDumpDir;
+ return ReplDumpWork.testInjectDumpDir;
}
} else {
return String.valueOf(System.currentTimeMillis());
@@ -284,9 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
continue;
}
Path functionRoot = new Path(functionsRoot, functionName);
- Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
+ Path functionMetadataFile = new Path(functionRoot, FUNCTION_METADATA_FILE_NAME);
try (JsonWriter jsonWriter =
- new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) {
+ new JsonWriter(functionMetadataFile.getFileSystem(conf), functionMetadataFile)) {
FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
serializer.writeTo(jsonWriter, tuple.replicationSpec);
}
@@ -315,6 +316,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
@Override
public StageType getType() {
- return StageType.REPLDUMP;
+ return StageType.REPL_DUMP;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
new file mode 100644
index 0000000..6ea1754
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -0,0 +1,264 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+
+public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
+ private final static int ZERO_TASKS = 0;
+
+ @Override
+ public String getName() {
+ return "REPL_BOOTSTRAP_LOAD";
+ }
+
+ /**
+ * Provides the root Tasks created as a result of this loadTask run which will be executed
+ * by the driver. It does not track details across multiple runs of LoadTask.
+ */
+ private static class Scope {
+ boolean database = false, table = false, partition = false;
+ List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
+ }
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ try {
+ int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+ Context context = new Context(conf, getHive());
+ TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+ /*
+ for now for simplicity we are doing just one directory ( one database ), come back to use
+ of multiple databases once we have the basic flow to chain creating of tasks in place for
+ a database ( directory )
+ */
+ BootstrapEventsIterator iterator = work.iterator();
+ /*
+ This is used to get hold of a reference during the current creation of tasks and is initialized
+ with "0" tasks such that it will be non consequential in any operations done with task tracker
+ compositions.
+ */
+ TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+ TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+ Scope scope = new Scope();
+ while (iterator.hasNext() && loadTaskTracker.canAddMoreTasks()) {
+ BootstrapEvent next = iterator.next();
+ switch (next.eventType()) {
+ case Database:
+ DatabaseEvent dbEvent = (DatabaseEvent) next;
+ dbTracker =
+ new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
+ .tasks();
+ loadTaskTracker.update(dbTracker);
+ if (work.hasDbState()) {
+ loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+ }
+ work.updateDbEventState(dbEvent.toState());
+ scope.database = true;
+ scope.rootTasks.addAll(dbTracker.tasks());
+ dbTracker.debugLog("database");
+ break;
+ case Table: {
+ /*
+ Implicit assumption here is that database level is processed first before table level,
+ which will depend on the iterator used since it should provide the higher level directory
+ listing before providing the lower level listing. This is also required such that
+ the dbTracker / tableTracker are setup correctly always.
+ */
+ TableContext tableContext =
+ new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
+ TableEvent tableEvent = (TableEvent) next;
+ LoadTable loadTable = new LoadTable(tableEvent, context, tableContext, loadTaskTracker);
+ tableTracker = loadTable.tasks();
+ if (!scope.database) {
+ scope.rootTasks.addAll(tableTracker.tasks());
+ scope.table = true;
+ }
+ setUpDependencies(dbTracker, tableTracker);
+ /*
+ for table replication if we reach the max number of tasks then for the next run we will
+ try to reload the same table again, this is mainly for ease of understanding the code
+ as then we can avoid handling == > loading partitions for the table given that
+ the creation of table lead to reaching max tasks vs, loading next table since current
+ one does not have partitions.
+ */
+
+ // for a table we explicitly try to load partitions as there is no separate partitions events.
+ LoadPartitions loadPartitions =
+ new LoadPartitions(context, loadTaskTracker, tableEvent, work.dbNameToLoadIn,
+ tableContext);
+ TaskTracker partitionsTracker = loadPartitions.tasks();
+ partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+ partitionsTracker);
+ tableTracker.debugLog("table");
+ partitionsTracker.debugLog("partitions for table");
+ break;
+ }
+ case Partition: {
+ /*
+ This will happen only when loading tables and we reach the limit of number of tasks we can create;
+ hence we know here that the table should exist and there should be a lastPartitionName
+ */
+ PartitionEvent event = (PartitionEvent) next;
+ TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
+ work.tableNameToLoadIn);
+ LoadPartitions loadPartitions =
+ new LoadPartitions(context, tableContext, loadTaskTracker, event.asTableEvent(),
+ work.dbNameToLoadIn,
+ event.lastPartitionReplicated());
+ /*
+ the tableTracker here should be a new instance and not an existing one as this can
+ only happen when we break in between loading partitions.
+ */
+ TaskTracker partitionsTracker = loadPartitions.tasks();
+ partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+ partitionsTracker);
+ partitionsTracker.debugLog("partitions");
+ break;
+ }
+ case Function: {
+ LoadFunction loadFunction =
+ new LoadFunction(context, (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
+ TaskTracker functionsTracker = loadFunction.tasks();
+ if (!scope.database) {
+ scope.rootTasks.addAll(functionsTracker.tasks());
+ } else {
+ setUpDependencies(dbTracker, functionsTracker);
+ }
+ loadTaskTracker.update(functionsTracker);
+ functionsTracker.debugLog("functions");
+ break;
+ }
+ }
+ }
+ boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState();
+ createBuilderTask(scope.rootTasks, addAnotherLoadTask);
+ if (!iterator.hasNext()) {
+ loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+ }
+ this.childTasks = scope.rootTasks;
+ LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+ } catch (Exception e) {
+ LOG.error("failed replication", e);
+ setException(e);
+ return 1;
+ }
+ LOG.info("completed load task run : {}", work.executedLoadTask());
+ return 0;
+ }
+
+ /**
+ * There was a database update done before and we want to make sure we update the last repl
+ * id on this database as we are now going to switch to processing a new database.
+ */
+ private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
+ throws SemanticException {
+ /*
+ we don't want to put any limits on this task as this is essential before we start
+ processing new database events.
+ */
+ TaskTracker taskTracker =
+ new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
+ new TaskTracker(maxTasks)).tasks();
+ scope.rootTasks.addAll(taskTracker.tasks());
+ return taskTracker;
+ }
+
+ private void partitionsPostProcessing(BootstrapEventsIterator iterator,
+ Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+ TaskTracker partitionsTracker) throws SemanticException {
+ setUpDependencies(tableTracker, partitionsTracker);
+ if (!scope.database && !scope.table) {
+ scope.rootTasks.addAll(partitionsTracker.tasks());
+ scope.partition = true;
+ }
+ loadTaskTracker.update(tableTracker);
+ loadTaskTracker.update(partitionsTracker);
+ if (partitionsTracker.hasReplicationState()) {
+ iterator.setReplicationState(partitionsTracker.replicationState());
+ }
+ }
+
+ /*
+ This sets up dependencies such that a child task is dependant on the parent to be complete.
+ */
+ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
+ for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+ for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+ parentTask.addDependentTask(childTask);
+ }
+ }
+ }
+
+ private void createBuilderTask(List<Task<? extends Serializable>> rootTasks,
+ boolean shouldCreateAnotherLoadTask) {
+ /*
+ use loadTask as dependencyCollection
+ */
+ if (shouldCreateAnotherLoadTask) {
+ Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+ dependency(rootTasks, loadTask);
+ }
+ }
+
+ /**
+ * add the dependency to the leaf node
+ */
+ private boolean dependency(List<Task<? extends Serializable>> tasks,
+ Task<ReplLoadWork> loadTask) {
+ if (tasks == null || tasks.isEmpty()) {
+ return true;
+ }
+ for (Task<? extends Serializable> task : tasks) {
+ boolean dependency = dependency(task.getChildTasks(), loadTask);
+ if (dependency) {
+ task.addDependentTask(loadTask);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.REPL_BOOTSTRAP_LOAD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
new file mode 100644
index 0000000..eb18e5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -0,0 +1,71 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.plan.Explain;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
+ Explain.Level.DEFAULT,
+ Explain.Level.EXTENDED })
+public class ReplLoadWork implements Serializable {
+ final String dbNameToLoadIn;
+ final String tableNameToLoadIn;
+ private final BootstrapEventsIterator iterator;
+ private int loadTaskRunCount = 0;
+ private DatabaseEvent.State state = null;
+
+ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
+ String tableNameToLoadIn) throws IOException {
+ this.tableNameToLoadIn = tableNameToLoadIn;
+ this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf);
+ this.dbNameToLoadIn = dbNameToLoadIn;
+ }
+
+ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern)
+ throws IOException {
+ this(hiveConf, dumpDirectory, dbNameOrPattern, null);
+ }
+
+ public BootstrapEventsIterator iterator() {
+ return iterator;
+ }
+
+ int executedLoadTask() {
+ return ++loadTaskRunCount;
+ }
+
+ void updateDbEventState(DatabaseEvent.State state) {
+ this.state = state;
+ }
+
+ DatabaseEvent databaseEvent(HiveConf hiveConf) {
+ DatabaseEvent databaseEvent = state.toEvent(hiveConf);
+ state = null;
+ return databaseEvent;
+ }
+
+ boolean hasDbState() {
+ return state != null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
new file mode 100644
index 0000000..db2b0ac
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/BootstrapEvent.java
@@ -0,0 +1,28 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+public interface BootstrapEvent {
+
+ EventType eventType();
+
+ enum EventType {
+ Database, Table, Function, Partition
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
new file mode 100644
index 0000000..6d6c336
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/DatabaseEvent.java
@@ -0,0 +1,34 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.io.Serializable;
+
+public interface DatabaseEvent extends BootstrapEvent {
+ Database dbInMetadata(String dbNameToOverride) throws SemanticException;
+
+ State toState();
+
+ interface State extends Serializable {
+ DatabaseEvent toEvent(HiveConf hiveConf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
new file mode 100644
index 0000000..30bb747
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/FunctionEvent.java
@@ -0,0 +1,33 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Exposing the FileSystem implementation outside which is what it should NOT do.
+ * <p>
+ * Since the bootstrap and incremental for functions is handled similarly. There
+ * is additional work to make sure we pass the event object from both places.
+ *
+ * @see org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.FunctionDescBuilder
+ * would be merged here mostly.
+ */
+public interface FunctionEvent extends BootstrapEvent {
+ Path rootDir();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
new file mode 100644
index 0000000..3b260d6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
@@ -0,0 +1,26 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+
+public interface PartitionEvent extends TableEvent {
+ AddPartitionDesc lastPartitionReplicated();
+
+ TableEvent asTableEvent();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
new file mode 100644
index 0000000..e817f5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -0,0 +1,42 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+
+import java.util.List;
+
+public interface TableEvent extends BootstrapEvent {
+ ImportTableDesc tableDesc(String dbName) throws SemanticException;
+
+ List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+ throws SemanticException;
+
+ ReplicationSpec replicationSpec();
+
+ boolean shouldNotReplicate();
+
+ /**
+ * Exposing the FileSystem implementation outside which is what it should NOT do.
+ */
+ Path metadataPath();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
new file mode 100644
index 0000000..4e635ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -0,0 +1,133 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Replication layout is from the root directory of replication Dump is
+ * db
+ * table1
+ * _metadata
+ * data
+ * _files
+ * table2
+ * _metadata
+ * data
+ * _files
+ * _functions
+ * functionName1
+ * _metadata
+ * functionName2
+ * _metadata
+ * this class understands this layout and hence will help in identifying for subsequent bootstrap tasks
+ * as to where the last set of tasks left execution and from where this task should pick up replication.
+ * Since for replication we have the need for hierarchy of tasks we need to make sure that db level are
+ * processed first before table, table level are processed first before partitions etc.
+ *
+ * Based on how the metadata is being exported on the file we have to currently take care of the following:
+ * 1. Make sure db level are processed first as this will be required before table / functions processing.
+ * 2. Table before partition is not explicitly required as table and partition metadata are in the same file.
+ *
+ *
+ * For future integrations other sources of events like kafka, would require to implement an Iterator<BootstrapEvent>
+ *
+ */
+public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
+ private DatabaseEventsIterator currentDatabaseIterator = null;
+ /*
+ This denotes listing of any directories where during replication we want to take care of
+ db level operations first, namely in our case its only during db creation on the replica
+ warehouse.
+ */
+ private Iterator<DatabaseEventsIterator> dbEventsIterator;
+
+ public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException {
+ Path path = new Path(dumpDirectory);
+ FileSystem fileSystem = path.getFileSystem(hiveConf);
+ FileStatus[] fileStatuses =
+ fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem));
+
+ List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
+ Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME);
+ try {
+ return fileSystem.exists(metadataPath);
+ } catch (IOException e) {
+ throw new RuntimeException("could not determine if exists : " + metadataPath.toString(), e);
+ }
+ }).collect(Collectors.toList());
+ dbEventsIterator = dbsToCreate.stream().map(f -> {
+ try {
+ return new DatabaseEventsIterator(f.getPath(), hiveConf);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Error while creating event iterator for db at path" + f.getPath().toString(), e);
+ }
+ }).collect(Collectors.toList()).iterator();
+
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (currentDatabaseIterator == null) {
+ if (dbEventsIterator.hasNext()) {
+ currentDatabaseIterator = dbEventsIterator.next();
+ } else {
+ return false;
+ }
+ } else if (currentDatabaseIterator.hasNext()) {
+ return true;
+ } else {
+ currentDatabaseIterator = null;
+ }
+ }
+ }
+
+ @Override
+ public BootstrapEvent next() {
+ return currentDatabaseIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super BootstrapEvent> action) {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void setReplicationState(ReplicationState replicationState) {
+ this.currentDatabaseIterator.replicationState = replicationState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
new file mode 100644
index 0000000..3100875
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -0,0 +1,141 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME;
+
+class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
+ private static Logger LOG = LoggerFactory.getLogger(DatabaseEventsIterator.class);
+ private RemoteIterator<LocatedFileStatus> remoteIterator;
+
+ private final Path dbLevelPath;
+ private HiveConf hiveConf;
+ ReplicationState replicationState;
+ private Path next = null, previous = null;
+ private boolean databaseEventProcessed = false;
+
+ DatabaseEventsIterator(Path dbLevelPath, HiveConf hiveConf) throws IOException {
+ this.dbLevelPath = dbLevelPath;
+ this.hiveConf = hiveConf;
+ FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf);
+ // this is only there for the use case where we are doing table only replication and not database level
+ if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) {
+ databaseEventProcessed = true;
+ }
+ remoteIterator = fileSystem.listFiles(dbLevelPath, true);
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (!databaseEventProcessed) {
+ next = dbLevelPath;
+ return true;
+ }
+
+ if (replicationState == null && next == null) {
+ while (remoteIterator.hasNext()) {
+ LocatedFileStatus next = remoteIterator.next();
+ if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
+ String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
+ List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toList());
+ if (filteredNames.size() == 1) {
+ // this relates to db level event tracked via databaseEventProcessed
+ } else {
+ this.next = next.getPath().getParent();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ // may be do some retry logic here.
+ throw new RuntimeException("could not traverse the file via remote iterator " + dbLevelPath,
+ e);
+ }
+ }
+
+ /*
+ we handle three types of scenarios with special case.
+ 1. handling of db Level _metadata
+ 2. handling of subsequent loadTask which will start running from the previous replicationState
+ 3. other events : these can only be either table / function _metadata.
+ */
+ @Override
+ public BootstrapEvent next() {
+ if (!databaseEventProcessed) {
+ FSDatabaseEvent event = new FSDatabaseEvent(hiveConf, next.toString());
+ databaseEventProcessed = true;
+ return postProcessing(event);
+ }
+
+ if (replicationState != null) {
+ return eventForReplicationState();
+ }
+
+ String currentPath = next.toString();
+ if (currentPath.contains(FUNCTIONS_ROOT_DIR_NAME)) {
+ LOG.debug("functions directory: {}", next.toString());
+ return postProcessing(new FSFunctionEvent(next));
+ }
+ return postProcessing(new FSTableEvent(hiveConf, next.toString()));
+ }
+
+ private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) {
+ previous = next;
+ next = null;
+ LOG.debug("processing " + previous);
+ return bootstrapEvent;
+ }
+
+ private BootstrapEvent eventForReplicationState() {
+ if (replicationState.partitionState != null) {
+ BootstrapEvent
+ bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState);
+ replicationState = null;
+ return bootstrapEvent;
+ } else if (replicationState.lastTableReplicated != null) {
+ FSTableEvent event = new FSTableEvent(hiveConf, previous.toString());
+ replicationState = null;
+ return event;
+ }
+ throw new IllegalStateException("for replicationState " + replicationState.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
new file mode 100644
index 0000000..48c908a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSDatabaseEvent.java
@@ -0,0 +1,88 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+
+public class FSDatabaseEvent implements DatabaseEvent {
+
+ private final Path dbMetadataFile;
+ private final FileSystem fileSystem;
+
+ FSDatabaseEvent(HiveConf hiveConf, String dbDumpDirectory) {
+ try {
+ this.dbMetadataFile = new Path(dbDumpDirectory, EximUtil.METADATA_NAME);
+ this.fileSystem = dbMetadataFile.getFileSystem(hiveConf);
+ } catch (Exception e) {
+ String message = "Error while identifying the filesystem for db "
+ + "metadata file in " + dbDumpDirectory;
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ @Override
+ public Database dbInMetadata(String dbNameToOverride) throws SemanticException {
+ try {
+ MetaData rv = EximUtil.readMetaData(fileSystem, dbMetadataFile);
+ Database dbObj = rv.getDatabase();
+ if (dbObj == null) {
+ throw new IllegalArgumentException(
+ "_metadata file read did not contain a db object - invalid dump.");
+ }
+
+ // override the db name if provided in repl load command
+ if (StringUtils.isNotBlank(dbNameToOverride)) {
+ dbObj.setName(dbNameToOverride);
+ }
+ return dbObj;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ @Override
+ public State toState() {
+ return new FSDBState(dbMetadataFile.getParent().toString());
+ }
+
+ @Override
+ public EventType eventType() {
+ return EventType.Database;
+ }
+
+ static class FSDBState implements DatabaseEvent.State {
+ final String dbDumpDirectory;
+
+ FSDBState(String dbDumpDirectory) {
+ this.dbDumpDirectory = dbDumpDirectory;
+ }
+
+ @Override
+ public DatabaseEvent toEvent(HiveConf hiveConf) {
+ return new FSDatabaseEvent(hiveConf, dbDumpDirectory);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
new file mode 100644
index 0000000..5b13ffc
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSFunctionEvent.java
@@ -0,0 +1,39 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+
+public class FSFunctionEvent implements FunctionEvent {
+ private final Path rootDir;
+
+ FSFunctionEvent(Path rootDir) {
+ this.rootDir = rootDir;
+ }
+
+ @Override
+ public Path rootDir() {
+ return rootDir;
+ }
+
+ @Override
+ public EventType eventType() {
+ return EventType.Function;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
new file mode 100644
index 0000000..9b71a2e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -0,0 +1,84 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+
+import java.util.List;
+
+public class FSPartitionEvent implements PartitionEvent {
+
+ private final ReplicationState replicationState;
+ private final TableEvent tableEvent;
+
+ FSPartitionEvent(HiveConf hiveConf, String metadataDir,
+ ReplicationState replicationState) {
+ tableEvent = new FSTableEvent(hiveConf, metadataDir);
+ this.replicationState = replicationState;
+ }
+
+ @Override
+ public EventType eventType() {
+ return EventType.Partition;
+ }
+
+ @Override
+ public AddPartitionDesc lastPartitionReplicated() {
+ assert replicationState != null && replicationState.partitionState != null;
+ return replicationState.partitionState.lastReplicatedPartition;
+ }
+
+ @Override
+ public TableEvent asTableEvent() {
+ return tableEvent;
+ }
+
+ @Override
+ public ImportTableDesc tableDesc(String dbName) throws SemanticException {
+ return tableEvent.tableDesc(dbName);
+ }
+
+ @Override
+ public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+ throws SemanticException {
+ return tableEvent.partitionDescriptions(tblDesc);
+ }
+
+ @Override
+ public ReplicationSpec replicationSpec() {
+ return tableEvent.replicationSpec();
+ }
+
+ @Override
+ public boolean shouldNotReplicate() {
+ return tableEvent.shouldNotReplicate();
+ }
+
+ @Override
+ public Path metadataPath() {
+ return tableEvent.metadataPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
new file mode 100644
index 0000000..f313404
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -0,0 +1,123 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FSTableEvent implements TableEvent {
+ private final Path fromPath;
+ private final MetaData metadata;
+
+ FSTableEvent(HiveConf hiveConf, String metadataDir) {
+ try {
+ URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir));
+ fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+ FileSystem fs = FileSystem.get(fromURI, hiveConf);
+ metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean shouldNotReplicate() {
+ ReplicationSpec spec = metadata.getReplicationSpec();
+ return spec.isNoop() || !spec.isInReplicationScope();
+ }
+
+ @Override
+ public Path metadataPath() {
+ return fromPath;
+ }
+
+ @Override
+ public ImportTableDesc tableDesc(String dbName) throws SemanticException {
+ try {
+ Table table = new Table(metadata.getTable());
+ ImportTableDesc tableDesc = new ImportTableDesc(dbName, table);
+ tableDesc.setReplicationSpec(metadata.getReplicationSpec());
+ return tableDesc;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ @Override
+ public List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
+ throws SemanticException {
+ List<AddPartitionDesc> descs = new ArrayList<>();
+ //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose.
+ for (Partition partition : metadata.getPartitions()) {
+ // TODO: this should ideally not create AddPartitionDesc per partition
+ AddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition);
+ descs.add(partsDesc);
+ }
+ return descs;
+ }
+
+ private AddPartitionDesc partitionDesc(Path fromPath,
+ ImportTableDesc tblDesc, Partition partition) throws SemanticException {
+ try {
+ AddPartitionDesc partsDesc =
+ new AddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(),
+ EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
+ partition.getSd().getLocation(), partition.getParameters());
+ AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0);
+ partDesc.setInputFormat(partition.getSd().getInputFormat());
+ partDesc.setOutputFormat(partition.getSd().getOutputFormat());
+ partDesc.setNumBuckets(partition.getSd().getNumBuckets());
+ partDesc.setCols(partition.getSd().getCols());
+ partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib());
+ partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
+ partDesc.setBucketCols(partition.getSd().getBucketCols());
+ partDesc.setSortCols(partition.getSd().getSortCols());
+ partDesc.setLocation(new Path(fromPath,
+ Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ partsDesc.setReplicationSpec(metadata.getReplicationSpec());
+ return partsDesc;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ @Override
+ public ReplicationSpec replicationSpec() {
+ return metadata.getReplicationSpec();
+ }
+
+ @Override
+ public EventType eventType() {
+ return EventType.Table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/92f764e0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
new file mode 100644
index 0000000..bab64ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -0,0 +1,129 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+public class LoadDatabase {
+
+ final Context context;
+ final TaskTracker tracker;
+
+ private final DatabaseEvent event;
+ private final String dbNameToLoadIn;
+
+ public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
+ TaskTracker loadTaskTracker) {
+ this.context = context;
+ this.event = event;
+ this.dbNameToLoadIn = dbNameToLoadIn;
+ this.tracker = new TaskTracker(loadTaskTracker);
+ }
+
+ public TaskTracker tasks() throws SemanticException {
+ try {
+ Database dbInMetadata = readDbMetadata();
+ Task<? extends Serializable> dbRootTask = existEmptyDb(dbInMetadata.getName())
+ ? alterDbTask(dbInMetadata, context.hiveConf)
+ : createDbTask(dbInMetadata);
+ tracker.addTask(dbRootTask);
+ return tracker;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ Database readDbMetadata() throws SemanticException {
+ return event.dbInMetadata(dbNameToLoadIn);
+ }
+
+ private Task<? extends Serializable> createDbTask(Database dbObj) {
+ CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+ createDbDesc.setName(dbObj.getName());
+ createDbDesc.setComment(dbObj.getDescription());
+
+ /*
+ explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going
+ to run multiple times and explicit logic is in place which prevents updates to tables when db level
+ last repl id is set and we create a AlterDatabaseTask at the end of processing a database.
+ */
+ Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
+ parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ createDbDesc.setDatabaseProperties(parameters);
+ // note that we do not set location - for repl load, we want that auto-created.
+ createDbDesc.setIfNotExists(false);
+ // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+ // db.
+ // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+ DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), createDbDesc);
+ return TaskFactory.get(work, context.hiveConf);
+ }
+
+ private static Task<? extends Serializable> alterDbTask(Database dbObj, HiveConf hiveConf) {
+ AlterDatabaseDesc alterDbDesc =
+ new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null);
+ DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc);
+ return TaskFactory.get(work, hiveConf);
+ }
+
+ private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException {
+ Database db = context.hiveDb.getDatabase(dbName);
+ if (db == null) {
+ return false;
+ }
+ List<String> allTables = context.hiveDb.getAllTables(dbName);
+ List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*");
+ if (allTables.isEmpty() && allFunctions.isEmpty()) {
+ return true;
+ }
+ throw new InvalidOperationException(
+ "Database " + db.getName() + " is not empty. One or more tables/functions exist.");
+ }
+
+ public static class AlterDatabase extends LoadDatabase {
+
+ public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn,
+ TaskTracker loadTaskTracker) {
+ super(context, event, dbNameToLoadIn, loadTaskTracker);
+ }
+
+ @Override
+ public TaskTracker tasks() throws SemanticException {
+ tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf));
+ return tracker;
+ }
+ }
+}