You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/05/29 10:08:29 UTC
[hive] branch master updated: HIVE-23353: Atlas metadata
replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1c1336f HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)
1c1336f is described below
commit 1c1336f712c9a9a492c3c4e32993758026630484
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Fri May 29 15:38:17 2020 +0530
HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 10 +
.../TestReplicationScenariosAcrossInstances.java | 145 +++++++++++++++
pom.xml | 2 +
ql/if/queryplan.thrift | 4 +-
ql/pom.xml | 50 +++++
ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 10 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.h | 4 +-
.../apache/hadoop/hive/ql/plan/api/StageType.java | 8 +-
ql/src/gen/thrift/gen-php/Types.php | 4 +
ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 6 +
ql/src/gen/thrift/gen-rb/queryplan_types.rb | 6 +-
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 6 +
.../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 204 +++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/AtlasDumpWork.java | 61 ++++++
.../hadoop/hive/ql/exec/repl/AtlasLoadTask.java | 137 ++++++++++++++
.../hadoop/hive/ql/exec/repl/AtlasLoadWork.java | 58 ++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 19 ++
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 20 +-
.../hive/ql/exec/repl/atlas/AtlasReplInfo.java | 101 ++++++++++
.../ql/exec/repl/atlas/AtlasRequestBuilder.java | 192 +++++++++++++++++++
.../hive/ql/exec/repl/atlas/AtlasRestClient.java | 43 +++++
.../ql/exec/repl/atlas/AtlasRestClientBuilder.java | 97 ++++++++++
.../ql/exec/repl/atlas/AtlasRestClientImpl.java | 175 ++++++++++++++++++
.../ql/exec/repl/atlas/NoOpAtlasRestClient.java | 57 ++++++
.../hive/ql/exec/repl/atlas/RetryingClient.java | 92 ++++++++++
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 23 +++
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 31 ++++
27 files changed, 1556 insertions(+), 9 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 281c4e2..abd12c9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -539,6 +539,16 @@ public class HiveConf extends Configuration {
true,
"This configuration will add a deny policy on the target database for all users except hive"
+ " to avoid any update to the target database"),
+ REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false,
+ "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."),
+ REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null,
+ "Atlas endpoint of the current cluster hive database is getting replicated from/to."),
+ REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null,
+ "Target hive database name Atlas metadata of source hive database is being replicated to."),
+ REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null,
+ "Name of the source cluster for the replication."),
+ REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
+ "Name of the target cluster for the replication."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 65f7303..d7b360c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -43,11 +43,18 @@ import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1538,4 +1545,142 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
}
}
+
+ //Testing just the configs and no impact on existing replication
+ @Test
+ public void testAtlasReplication() throws Throwable {
+ Map<String, String> confMap = defaultAtlasConfMap();
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+ verifyAtlasMetadataPresent();
+
+ confMap.remove("hive.repl.atlas.replicatedto");
+ replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"acid_table", "table1"})
+ .run("select * from table1")
+ .verifyResults(new String[] {"1", "2"});
+ }
+
+ @Test
+ public void testAtlasMissingConfigs() throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)");
+ Map<String, String> confMap = new HashMap<>();
+ confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+ confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
+ ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true);
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true);
+ confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true);
+ confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+ primary.dump(primaryDbName, getAtlasClause(confMap));
+ verifyAtlasMetadataPresent();
+
+ confMap.clear();
+ confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+ confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
+ ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false);
+ confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+ ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false);
+ confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+ primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap));
+ }
+
+ private void ensureInvalidUrl(List<String> atlasClause, String endpoint, boolean dump) throws Throwable {
+ try {
+ if (dump) {
+ primary.dump(primaryDbName, atlasClause);
+ } else {
+ primary.load(replicatedDbName, primaryDbName, atlasClause);
+ }
+ } catch (MalformedURLException e) {
+ return;
+ }
+ Assert.fail("Atlas endpoint is invalid and but test didn't fail:" + endpoint);
+ }
+
+ private void verifyAtlasMetadataPresent() throws IOException {
+ Path dbReplDir = new Path(primary.repldDir,
+ Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
+ FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf());
+ assertTrue(fs.exists(dbReplDir));
+ FileStatus[] dumpRoots = fs.listStatus(dbReplDir);
+ assert(dumpRoots.length == 1);
+ Path dumpRoot = dumpRoots[0].getPath();
+ assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR)));
+ Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR);
+ assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot));
+ assertTrue("Atlas export file doesn't exist",
+ fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME)));
+ assertTrue("Atlas dump metadata doesn't exist",
+ fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)));
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new InputStreamReader(
+ fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset()));
+ String[] lineContents = br.readLine().split("\t", 5);
+ assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]);
+ assertEquals(0, Long.parseLong(lineContents[1]));
+ } finally {
+ if (br != null) {
+ br.close();
+ }
+ }
+ }
+
+ private void ensureFailedReplOperation(List<String> clause, String conf, boolean dump) throws Throwable {
+ try {
+ if (dump) {
+ primary.dump(primaryDbName, clause);
+ } else {
+ primary.load(replicatedDbName, primaryDbName, clause);
+ }
+ Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
+ } catch (SemanticException e) {
+ assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication"));
+ }
+ }
+
+ private Map<String, String> defaultAtlasConfMap() {
+ Map<String, String> confMap = new HashMap<>();
+ confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+ confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+ confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
+ confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+ confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+ return confMap;
+ }
+
+ private List<String> getAtlasClause(Map<String, String> confMap) {
+ List confList = new ArrayList();
+ for (Map.Entry<String, String> entry:confMap.entrySet()) {
+ confList.add(quote(entry.getKey()) + "=" + quote(entry.getValue()));
+ }
+ return confList;
+ }
+
+ private String quote(String str) {
+ return "'" + str + "'";
+ }
}
diff --git a/pom.xml b/pom.xml
index b4b41ea..cedcb2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
<apache-directory-server.version>1.5.7</apache-directory-server.version>
<!-- Include arrow for LlapOutputFormatService -->
<arrow.version>0.10.0</arrow.version>
+ <atlas.client.version>2.0.0</atlas.client.version>
<avatica.version>1.12.0</avatica.version>
<avro.version>1.8.2</avro.version>
<calcite.version>1.21.0</calcite.version>
@@ -123,6 +124,7 @@
<commons-codec.version>1.7</commons-codec.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-compress.version>1.19</commons-compress.version>
+ <commons-configuration.version>1.10</commons-configuration.version>
<commons-exec.version>1.1</commons-exec.version>
<commons-io.version>2.6</commons-io.version>
<commons-lang3.version>3.9</commons-lang3.version>
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index d7d7cdc..91bff61 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -109,7 +109,9 @@ enum StageType {
SCHEDULED_QUERY_MAINT,
ACK,
RANGER_DUMP,
- RANGER_LOAD
+ RANGER_LOAD,
+ ATLAS_DUMP,
+ ATLAS_LOAD
}
struct Stage {
diff --git a/ql/pom.xml b/ql/pom.xml
index 816b400..689d2d1 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -38,6 +38,56 @@
<!-- intra-project -->
<!-- used for vector code-gen -->
<dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${atlas.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-common</artifactId>
+ <version>${atlas.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-intg</artifactId>
+ <version>${atlas.client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>${commons-configuration.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
<version>${project.version}</version>
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 00847d5..c7338e1 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -126,7 +126,9 @@ int _kStageTypeValues[] = {
StageType::SCHEDULED_QUERY_MAINT,
StageType::ACK,
StageType::RANGER_DUMP,
- StageType::RANGER_LOAD
+ StageType::RANGER_LOAD,
+ StageType::ATLAS_DUMP,
+ StageType::ATLAS_LOAD
};
const char* _kStageTypeNames[] = {
"CONDITIONAL",
@@ -149,9 +151,11 @@ const char* _kStageTypeNames[] = {
"SCHEDULED_QUERY_MAINT",
"ACK",
"RANGER_DUMP",
- "RANGER_LOAD"
+ "RANGER_LOAD",
+ "ATLAS_DUMP",
+ "ATLAS_LOAD
};
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(23, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
Adjacency::~Adjacency() throw() {
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index b3d9cf7..1e8c702 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -103,7 +103,9 @@ struct StageType {
SCHEDULED_QUERY_MAINT = 17,
ACK = 18,
RANGER_DUMP = 19,
- RANGER_LOAD = 20
+ RANGER_LOAD = 20,
+ ATLAS_DUMP = 21,
+ ATLAS_LOAD = 22
};
};
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 31c3429..eba0230 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -32,7 +32,9 @@ public enum StageType implements org.apache.thrift.TEnum {
SCHEDULED_QUERY_MAINT(17),
ACK(18),
RANGER_DUMP(19),
- RANGER_LOAD(20);
+ RANGER_LOAD(20),
+ ATLAS_DUMP(21),
+ ATLAS_LOAD(22);
private final int value;
@@ -95,6 +97,10 @@ public enum StageType implements org.apache.thrift.TEnum {
return RANGER_DUMP;
case 20:
return RANGER_LOAD;
+ case 21:
+ return ATLAS_DUMP;
+ case 22:
+ return ATLAS_LOAD;
default:
return null;
}
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index d805d21..7cb3bea 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -125,6 +125,8 @@ final class StageType {
const ACK = 18;
const RANGER_DUMP = 19;
const RANGER_LOAD = 20;
+ const ATLAS_DUMP = 21;
+ const ATLAS_LOAD = 22;
static public $__names = array(
0 => 'CONDITIONAL',
1 => 'COPY',
@@ -147,6 +149,8 @@ final class StageType {
18 => 'ACK',
19 => 'RANGER_DUMP',
20 => 'RANGER_LOAD',
+ 21 => 'ATLAS_DUMP',
+ 22 => 'ATLAS_LOAD',
);
}
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 33a53ce..39a20c3 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -172,6 +172,8 @@ class StageType:
ACK = 18
RANGER_DUMP = 19
RANGER_LOAD = 20
+ ATLAS_DUMP = 21
+ ATLAS_LOAD = 22
_VALUES_TO_NAMES = {
0: "CONDITIONAL",
@@ -195,6 +197,8 @@ class StageType:
18: "ACK",
19: "RANGER_DUMP",
20: "RANGER_LOAD",
+ 21: "ATLAS_DUMP",
+ 22: "ATLAS_LOAD",
}
_NAMES_TO_VALUES = {
@@ -219,6 +223,8 @@ class StageType:
"ACK": 18,
"RANGER_DUMP": 19,
"RANGER_LOAD": 20,
+ "ATLAS_DUMP": 21,
+ "ATLAS_LOAD": 22,
}
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index d31ba3c..0e827b2 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -82,8 +82,10 @@ module StageType
ACK = 18
RANGER_DUMP = 19
RANGER_LOAD = 20
- VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD"}
- VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD]).freeze
+ ATLAS_DUMP = 21
+ ATLAS_LOAD = 22
+ VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD", 21 => "ATLAS_DUMP", 22 => "ATLAS_LOAD"}
+ VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD, ATLAS_DUMP, ATLAS_LOAD]).freeze
end
class Adjacency
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index f0e5461..fd2715d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hive.ql.ddl.DDLTask;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
@@ -120,6 +124,8 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class));
taskvec.add(new TaskTuple<RangerDumpWork>(RangerDumpWork.class, RangerDumpTask.class));
taskvec.add(new TaskTuple<RangerLoadWork>(RangerLoadWork.class, RangerLoadTask.class));
+ taskvec.add(new TaskTuple<AtlasDumpWork>(AtlasDumpWork.class, AtlasDumpTask.class));
+ taskvec.add(new TaskTuple<AtlasLoadWork>(AtlasLoadWork.class, AtlasLoadTask.class));
taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
new file mode 100644
index 0000000..26cdc6b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+ private static final long serialVersionUID = 1L;
+ private transient AtlasRestClient atlasRestClient;
+
+ @Override
+ public int execute() {
+ try {
+ AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
+ LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+ atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+ atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
+ .getClient(atlasReplInfo.getConf());
+ AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+ String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(),
+ atlasReplInfo.getSrcDB());
+ long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+ dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+ createDumpMetadata(atlasReplInfo, currentModifiedTime);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Exception while dumping atlas metadata", e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ }
+
+ private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+ String errorFormat = "%s is mandatory config for Atlas metadata replication";
+ //Also validates URL for endpoint.
+ String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
+ .toString();
+ String tgtDB = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, conf, errorFormat);
+ String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
+ String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), tgtDB, srcCluster,
+ tgtCluster, work.getStagingDir(), conf);
+ atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ long lastTimeStamp = work.isBootstrap() ? 0L : lastStoredTimeStamp();
+ atlasReplInfo.setTimeStamp(lastTimeStamp);
+ return atlasReplInfo;
+ }
+
+ private long lastStoredTimeStamp() throws SemanticException {
+ Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+ BufferedReader br = null;
+ try {
+ FileSystem fs = prevMetadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+ String line = br.readLine();
+ if (line == null) {
+ throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file");
+ }
+ String[] lineContents = line.split("\t", 5);
+ return Long.parseLong(lineContents[1]);
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+ AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+ long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+ ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+ LOG.debug("Current timestamp is: {}", ret);
+ return ret;
+ }
+
+ private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
+ throws SemanticException {
+ InputStream inputStream = null;
+ try {
+ AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+ atlasReplInfo.getSrcCluster());
+ inputStream = atlasRestClient.exportData(exportRequest);
+ FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
+ Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+ long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
+ LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten);
+ } catch (SemanticException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName,
+ String srcDb)
+ throws SemanticException {
+ AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
+ Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
+ if (entries == null || entries.isEmpty()) {
+ throw new SemanticException("Could find entries in objectId for:" + clusterName);
+ }
+ Map.Entry<String, Object> item = entries.iterator().next();
+ String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue());
+ if (guid == null || guid.isEmpty()) {
+ throw new SemanticException("Entity not found:" + objectId);
+ }
+ return guid;
+ }
+
+ private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+ Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
+ List<List<String>> listValues = new ArrayList<>();
+ listValues.add(
+ Arrays.asList(
+ atlasReplInfo.getSrcFsUri(),
+ String.valueOf(lastModifiedTime)
+ )
+ );
+ Utils.writeOutput(listValues, dumpFile, conf, true);
+ LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString());
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.ATLAS_DUMP;
+ }
+
+ @Override
+ public String getName() {
+ return "ATLAS_DUMP";
+ }
+
+ @Override
+ public boolean canExecuteInParallel() {
+ return false;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
new file mode 100644
index 0000000..3344152
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasDumpWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final Path stagingDir;
+ private final boolean bootstrap;
+ private final Path prevAtlasDumpDir;
+
+
+ public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) {
+ this.srcDB = srcDB;
+ this.stagingDir = stagingDir;
+ this.bootstrap = bootstrap;
+ this.prevAtlasDumpDir = prevAtlasDumpDir;
+ }
+
+ public boolean isBootstrap() {
+ return bootstrap;
+ }
+
+ public Path getPrevAtlasDumpDir() {
+ return prevAtlasDumpDir;
+ }
+
+ public String getSrcDB() {
+ return srcDB;
+ }
+
+ public Path getStagingDir() {
+ return stagingDir;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
new file mode 100644
index 0000000..fceded5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
+
+ @Override
+ public int execute() {
+ try {
+ AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
+ LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
+ atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+ int importCount = importAtlasMetadata(atlasReplInfo);
+ LOG.info("Atlas entities import count {}", importCount);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Exception while loading atlas metadata", e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ }
+
+ private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+ String errorFormat = "%s is mandatory config for Atlas metadata replication";
+ //Also validates URL for endpoint.
+ String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
+ .toString();
+ String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
+ String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(),
+ srcCluster, tgtCluster, work.getStagingDir(), conf);
+ atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+ atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ return atlasReplInfo;
+ }
+
+ private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
+ Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
+ BufferedReader br = null;
+ try {
+ FileSystem fs = metadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+ String line = br.readLine();
+ if (line == null) {
+ throw new SemanticException("Could not read stored src FS Uri from atlas metadata file");
+ }
+ String[] lineContents = line.split("\t", 5);
+ return lineContents[0];
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
+ AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+ AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(
+ atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
+ atlasReplInfo.getSrcCluster(), atlasReplInfo.getTgtCluster(),
+ atlasReplInfo.getSrcFsUri(), atlasReplInfo.getTgtFsUri());
+ AtlasImportResult result = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
+ .getClient(atlasReplInfo.getConf()).importData(importRequest, atlasReplInfo);
+ if (result == null || result.getProcessedEntities() == null) {
+ LOG.info("No Atlas entity found");
+ return 0;
+ }
+ return result.getProcessedEntities().size();
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.ATLAS_LOAD;
+ }
+
+ @Override
+ public String getName() {
+ return "ATLAS_LOAD";
+ }
+
+ @Override
+ public boolean canExecuteInParallel() {
+ return false;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
new file mode 100644
index 0000000..4dc1ea8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final String tgtDB;
+ private final Path stagingDir;
+
+ public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) {
+ this.srcDB = srcDB;
+ this.tgtDB = tgtDB;
+ this.stagingDir = stagingDir;
+ }
+
+ public static long getSerialVersionUID() {
+ return serialVersionUID;
+ }
+
+ public String getSrcDB() {
+ return srcDB;
+ }
+
+ public String getTgtDB() {
+ return tgtDB;
+ }
+
+ public Path getStagingDir() {
+ return stagingDir;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 516b6f4..046b6a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -147,6 +147,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
work.setCurrentDumpPath(currentDumpPath);
+ if (shouldDumpAtlasMetadata()) {
+ addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath);
+ LOG.info("Added task to dump atlas metadata.");
+ }
if (shouldDumpAuthorizationMetadata()) {
initiateAuthorizationDumpTask();
}
@@ -195,6 +199,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA);
}
+ private boolean shouldDumpAtlasMetadata() {
+ return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
+ }
+
private Path getEncodedDumpRootPath() throws UnsupportedEncodingException {
return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
@@ -228,6 +236,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
+ private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) {
+ Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR);
+ Path prevAtlasDumpDir = prevHiveDumpDir == null ? null
+ : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR);
+ AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir);
+ Task<?> atlasDumpTask = TaskFactory.get(atlasDumpWork, conf);
+ childTasks = new ArrayList<>();
+ childTasks.add(atlasDumpTask);
+ }
+
+
private void finishRemainingTasks() throws SemanticException {
Path dumpAckFile = new Path(work.getCurrentDumpPath(),
ReplUtils.REPL_HIVE_BASE_DIR + File.separator
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7a30962..792e331 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -107,6 +107,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
}
work.setRootTask(this);
this.parentTasks = null;
+ if (shouldLoadAtlasMetadata()) {
+ addAtlasLoadTask();
+ }
if (shouldLoadAuthorizationMetadata()) {
initiateAuthorizationLoadTask();
}
@@ -141,8 +144,23 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
childTasks.add(rangerLoadTask);
} else {
throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
- + " not supported for replication ");
+ + " not supported for replication ");
+ }
+ }
+
+ private void addAtlasLoadTask() throws HiveException {
+ Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR);
+ LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir);
+ AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir);
+ Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf);
+ if (childTasks == null) {
+ childTasks = new ArrayList<>();
}
+ childTasks.add(atlasLoadTask);
+ }
+
+ private boolean shouldLoadAtlasMetadata() {
+ return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
}
private int executeBootStrapLoad() throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java
new file mode 100644
index 0000000..b0923d7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Atlas metadata replication info holder.
+ */
+public class AtlasReplInfo {
+ private final String srcDB;
+ private final String tgtDB;
+ private final String srcCluster;
+ private final String tgtCluster;
+ private final Path stagingDir;
+ private final HiveConf conf;
+ private final String atlasEndpoint;
+ private String srcFsUri;
+ private String tgtFsUri;
+ private long timeStamp;
+
+ public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String srcCluster,
+ String tgtCluster, Path stagingDir, HiveConf conf) {
+ this.atlasEndpoint = atlasEndpoint;
+ this.srcDB = srcDB;
+ this.tgtDB = tgtDB;
+ this.srcCluster = srcCluster;
+ this.tgtCluster = tgtCluster;
+ this.stagingDir = stagingDir;
+ this.conf = conf;
+ }
+
+ public String getSrcDB() {
+ return srcDB;
+ }
+
+ public String getTgtDB() {
+ return tgtDB;
+ }
+
+ public String getSrcCluster() {
+ return srcCluster;
+ }
+
+ public String getTgtCluster() {
+ return tgtCluster;
+ }
+
+ public Path getStagingDir() {
+ return stagingDir;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public String getAtlasEndpoint() {
+ return atlasEndpoint;
+ }
+
+ public String getSrcFsUri() {
+ return srcFsUri;
+ }
+
+ public void setSrcFsUri(String srcFsUri) {
+ this.srcFsUri = srcFsUri;
+ }
+
+ public String getTgtFsUri() {
+ return tgtFsUri;
+ }
+
+ public void setTgtFsUri(String tgtFsUri) {
+ this.tgtFsUri = tgtFsUri;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
new file mode 100644
index 0000000..3c72f8f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class);
+ public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+ static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+ static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+ private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+ private static final String ATTRIBUTE_NAME_NAME = ".name";
+ private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+ private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom";
+ private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+ private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME;
+ private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME;
+ private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION;
+ private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION;
+
+ private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+ private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+ public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) {
+ List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer);
+ Map<String, Object> options = getOptions(atlasReplInfo);
+ return createRequest(itemsToExport, options);
+ }
+
+ private List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) {
+ List<AtlasObjectId> atlasObjectIds = new ArrayList<>();
+ final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB());
+ atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+ return atlasObjectIds;
+ }
+
+ private AtlasExportRequest createRequest(final List<AtlasObjectId> itemsToExport,
+ final Map<String, Object> options) {
+ AtlasExportRequest request = new AtlasExportRequest() {
+ {
+ setItemsToExport(itemsToExport);
+ setOptions(options);
+ }
+ };
+ LOG.debug("createRequest: {}" + request);
+ return request;
+ }
+
+ private Map<String, Object> getOptions(AtlasReplInfo atlasReplInfo) {
+ String targetCluster = atlasReplInfo.getTgtCluster();
+ Map<String, Object> options = new HashMap<>();
+ options.put(AtlasExportRequest.OPTION_FETCH_TYPE, AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
+ options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, atlasReplInfo.getTimeStamp());
+ options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true);
+ if (targetCluster != null && !targetCluster.isEmpty()) {
+ options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster);
+ }
+ return options;
+ }
+
+ public AtlasObjectId getItemToExport(String srcCluster, String srcDB) {
+ final String qualifiedName = getQualifiedName(srcCluster, srcDB);
+ return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+ }
+
+ private String getQualifiedName(String clusterName, String srcDb) {
+ String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, srcDb.toLowerCase(), clusterName);
+ LOG.debug("Atlas getQualifiedName: {}", qualifiedName);
+ return qualifiedName;
+ }
+
+ public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet,
+ String sourceClusterName, String targetClusterName,
+ String sourceFsEndpoint, String targetFsEndpoint) {
+ AtlasImportRequest request = new AtlasImportRequest();
+ addTransforms(request.getOptions(),
+ sourceClusterName, targetClusterName,
+ sourceDataSet, targetDataSet,
+ sourceFsEndpoint, targetFsEndpoint);
+ addReplicatedFrom(request.getOptions(), sourceClusterName);
+ LOG.debug("Atlas metadata import request: {}" + request);
+ return request;
+ }
+
+ private void addTransforms(Map<String, String> options, String srcClusterName,
+ String tgtClusterName, String sourceDataSet, String targetDataSet,
+ String sourcefsEndpoint, String targetFsEndpoint) {
+ List<AttributeTransform> transforms = new ArrayList<>();
+ String sanitizedSourceClusterName = sanitizeForClassificationName(srcClusterName);
+ addClassificationTransform(transforms,
+ String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName));
+ addClearReplicationAttributesTransform(transforms);
+ addClusterRenameTransform(transforms, srcClusterName, tgtClusterName);
+ if (!sourceDataSet.equals(targetDataSet)) {
+ addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet);
+ }
+ addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint);
+ options.put(AtlasImportRequest.TRANSFORMERS_KEY, AtlasType.toJson(transforms));
+ }
+
+ private void addLocationTransform(List<AttributeTransform> transforms, String srcFsUri, String tgtFsUri) {
+ transforms.add(create(
+ HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+ HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+ )
+ );
+ transforms.add(create(
+ HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+ HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+ )
+ );
+ }
+
+ private void addDataSetRenameTransform(List<AttributeTransform> transforms,
+ String sourceDataSet, String targetDataSet) {
+ transforms.add(create(
+ HIVE_DB_NAME, "EQUALS: " + sourceDataSet,
+ HIVE_DB_NAME, "SET: " + targetDataSet));
+ }
+
+ private void addClusterRenameTransform(List<AttributeTransform> transforms,
+ String srcClusterName, String tgtClustername) {
+ transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName,
+ HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername));
+ }
+
+ private void addReplicatedFrom(Map<String, String> options, String sourceClusterName) {
+ options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, sourceClusterName);
+ }
+
+ private void addClassificationTransform(List<AttributeTransform> transforms, String classificationName) {
+ transforms.add(create("__entity", "topLevel: ",
+ "__entity", "ADD_CLASSIFICATION: " + classificationName));
+ }
+
+ private String sanitizeForClassificationName(String s) {
+ if (StringUtils.isEmpty(s)) {
+ return s;
+ }
+ return s.replace('-', '_').replace(' ', '_');
+ }
+
+ private void addClearReplicationAttributesTransform(List<AttributeTransform> transforms) {
+ Map<String, String> actions = new HashMap<>();
+ actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, "CLEAR:");
+ actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, "CLEAR:");
+
+ transforms.add(new AttributeTransform(null, actions));
+ }
+
+ private AttributeTransform create(String conditionLhs, String conditionRhs,
+ String actionLhs, String actionRhs) {
+ return new AttributeTransform(Collections.singletonMap(conditionLhs, conditionRhs),
+ Collections.singletonMap(actionLhs, actionRhs));
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
new file mode 100644
index 0000000..dd72f83
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.InputStream;
+
+/**
+ * Atlas RESTClient interface for Atlas' REST APIs.
+ */
+public interface AtlasRestClient {
+
+ InputStream exportData(AtlasExportRequest request) throws Exception;
+
+ AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception;
+
+ AtlasServer getServer(String endpoint) throws SemanticException;
+
+ String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName)
+ throws SemanticException;
+
+ boolean getStatus() throws SemanticException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
new file mode 100644
index 0000000..37b623f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+ private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+ private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+ private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+ private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+ private static final String URL_SEPERATOR = ",";
+
+ private UserGroupInformation userGroupInformation;
+ protected String incomingUrl;
+ protected String[] baseUrls;
+
+ public AtlasRestClientBuilder(String urls) {
+ this.incomingUrl = urls;
+ if (urls.contains(URL_SEPERATOR)) {
+ this.baseUrls = urls.split(URL_SEPERATOR);
+ } else {
+ this.baseUrls = new String[]{urls};
+ }
+ }
+
+ public AtlasRestClient getClient(HiveConf conf) throws SemanticException {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+ return new NoOpAtlasRestClient();
+ }
+ return create();
+ }
+
+ private AtlasRestClient create() throws SemanticException {
+ if (baseUrls == null || baseUrls.length == 0) {
+ throw new SemanticException("baseUrls is not set.");
+ }
+ setUGInfo();
+ initializeAtlasApplicationProperties();
+ AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
+ this.userGroupInformation.getShortUserName(), baseUrls);
+ return new AtlasRestClientImpl(clientV2);
+ }
+
+ private AtlasRestClientBuilder setUGInfo() throws SemanticException {
+ try {
+ this.userGroupInformation = UserGroupInformation.getLoginUser();
+ LOG.info("AuthStrategy: Kerberos : urls: {} : userGroupInformation: {}", baseUrls, userGroupInformation);
+ } catch (Exception e) {
+ throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+ }
+ return this;
+ }
+
+ private void initializeAtlasApplicationProperties() throws SemanticException {
+ try {
+ Properties props = new Properties();
+ props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1");
+ props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0");
+ props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl);
+ props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true");
+ ApplicationProperties.set(ConfigurationConverter.getConfiguration(props));
+ } catch (AtlasException e) {
+ throw new SemanticException(e);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
new file mode 100644
index 0000000..c8d738e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
+ private final AtlasClientV2 clientV2;
+
+ public AtlasRestClientImpl(AtlasClientV2 clientV2) {
+ this.clientV2 = clientV2;
+ }
+
+ private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<T> future = executor.submit(callable);
+ executor.shutdown();
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ throw e;
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof Exception) {
+ throw (Exception) t;
+ } else {
+ throw new IllegalStateException(t);
+ }
+ }
+ }
+
+ public InputStream exportData(AtlasExportRequest request) throws Exception {
+ LOG.debug("exportData: {}" + request);
+ return invokeWithRetry(new Callable<InputStream>() {
+ @Override
+ public InputStream call() throws Exception {
+ return clientV2.exportData(request);
+ }
+ }, null);
+ }
+
+ public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception {
+ AtlasImportResult defaultResult = getDefaultAtlasImportResult(request);
+ Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+ FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf());
+ if (!fs.exists(exportFilePath)) {
+ return defaultResult;
+ }
+ LOG.debug("Atlas import data request: {}" + request);
+ return invokeWithRetry(new Callable<AtlasImportResult>() {
+ @Override
+ public AtlasImportResult call() throws Exception {
+ InputStream is = null;
+ try {
+ is = fs.open(exportFilePath);
+ return clientV2.importData(request, is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+ }, defaultResult);
+ }
+
+ private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) {
+ return new AtlasImportResult(request, "", "", "", 0L);
+ }
+
+ public AtlasServer getServer(String endpoint) throws SemanticException {
+ try {
+ return clientV2.getServer(endpoint);
+ } catch (AtlasServiceException e) {
+ int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
+ if (statusCode != NOT_FOUND.getStatusCode()) {
+ throw new SemanticException("Exception while getServer ", e.getCause());
+ }
+ LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage());
+ }
+ return null;
+ }
+
+ public String getEntityGuid(final String entityType,
+ final String attributeName, final String qualifiedName) throws SemanticException {
+ int entityApiTimeOut = 10;
+ final Map<String, String> attributes = new HashMap<String, String>() {
+ {
+ put(attributeName, qualifiedName);
+ }
+ };
+
+ try {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout(
+ new Callable<AtlasEntity.AtlasEntityWithExtInfo>() {
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception {
+ return clientV2.getEntityByAttribute(entityType, attributes);
+ }
+ }, entityApiTimeOut, TimeUnit.SECONDS);
+
+ if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) {
+ LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}",
+ entityType, attributeName, qualifiedName);
+ return null;
+ }
+ return entityWithExtInfo.getEntity().getGuid();
+ } catch (AtlasServiceException e) {
+ int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
+ if (statusCode != NOT_FOUND.getStatusCode()) {
+ throw new SemanticException("Exception while getEntityGuid ", e.getCause());
+ }
+ LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}",
+ entityType, attributeName, qualifiedName, e.getMessage());
+ return null;
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ public boolean getStatus() throws SemanticException {
+ try {
+ return clientV2.isServerReady();
+ } catch (AtlasServiceException e) {
+ throw new SemanticException(e.getCause());
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
new file mode 100644
index 0000000..59dcbf7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class NoOpAtlasRestClient implements AtlasRestClient {
+
+ public InputStream exportData(AtlasExportRequest request) {
+ return new ByteArrayInputStream("Dummy".getBytes(Charset.forName("UTF-8")));
+ }
+
+ public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) {
+ return new AtlasImportResult(request, "", "", "", 0L);
+ }
+
+ public AtlasServer getServer(String endpoint) {
+ return new AtlasServer();
+ }
+
+ public String getEntityGuid(final String entityType,
+ final String attributeName, final String qualifiedName) {
+ return UUID.randomUUID().toString();
+ }
+
+ public boolean getStatus() {
+ return true;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
new file mode 100644
index 0000000..dbc065a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.atlas.AtlasServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Implement retry logic for service calls.
+ */
+public class RetryingClient {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class);
+ private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000);
+ private static final int RETRY_COUNT_DEFAULT = 5;
+ private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
+ private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
+ private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
+ private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT;
+ private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT;
+
+ protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
+ for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) {
+ try {
+ LOG.debug("Retrying method: {}", func.getClass().getName(), null);
+ return func.call();
+ } catch (Exception e) {
+ if (processImportExportLockException(e, currentRetryCount)) {
+ continue;
+ }
+ if (processInvalidParameterException(e)) {
+ return null;
+ }
+ LOG.error(func.getClass().getName(), e);
+ throw new Exception(e);
+ }
+ }
+ return defaultReturnValue;
+ }
+
+ private boolean processInvalidParameterException(Exception e) {
+ if (e instanceof UniformInterfaceException) {
+ return true;
+ }
+ if (!(e instanceof AtlasServiceException)) {
+ return false;
+ }
+ if (e.getMessage() == null) {
+ return false;
+ }
+ return (e.getMessage().contains(ERROR_MESSAGE_NO_ENTITIES)
+ || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
+ }
+
+ private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception {
+ if (!(e instanceof AtlasServiceException)) {
+ return false;
+ }
+ String excMessage = e.getMessage() == null ? "" : e.getMessage();
+ if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
+ try {
+ int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount;
+ LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration);
+ Thread.sleep(pauseDuration);
+ } catch (InterruptedException intEx) {
+ LOG.error("Pause wait interrupted!", intEx);
+ throw new Exception(intEx);
+ }
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 543ceca..c0aadb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
@@ -82,6 +83,19 @@ public class ReplUtils {
// Root base directory name for ranger.
public static final String REPL_RANGER_BASE_DIR = "ranger";
+ // Root base directory name for atlas.
+ public static final String REPL_ATLAS_BASE_DIR = "atlas";
+
+ // Atlas meta data export file.
+ public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip";
+
+ // Config for hadoop default file system.
+ public static final String DEFAULT_FS_CONFIG = "fs.defaultFS";
+
+ // Cluster name separator, used when the cluster name contains data center name as well, e.g. dc$mycluster1.
+ public static final String CLUSTER_NAME_SEPARATOR = "$";
+
+
// Name of the directory which stores the list of tables included in the policy in case of table level replication.
// One file per database, named after the db name. The directory is not created for db level replication.
public static final String REPL_TABLE_LIST_DIR_NAME = "_tables";
@@ -184,6 +198,15 @@ public class ReplUtils {
return false;
}
+ public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat)
+ throws SemanticException {
+ String val = hiveConf.get(configParam);
+ if (StringUtils.isEmpty(val)) {
+ throw new SemanticException(String.format(errorMsgFormat, configParam));
+ }
+ return val;
+ }
+
public static boolean isTableMigratingToTransactional(HiveConf conf,
org.apache.hadoop.hive.metastore.api.Table tableObj)
throws TException, IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 44320a5..154f028 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.dump;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -57,6 +59,7 @@ import java.util.UUID;
public class Utils {
private static Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state.";
+ private static final int DEF_BUF_SIZE = 8 * 1024;
public enum ReplDumpState {
IDLE, ACTIVE
@@ -97,6 +100,34 @@ public class Utils {
}
}
+ public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws SemanticException {
+ Retry<Long> retriable = new Retry<Long>(IOException.class) {
+ @Override
+ public Long execute() throws IOException {
+ FSDataOutputStream fos = null;
+ try {
+ long bytesWritten;
+ fos = fs.create(exportFilePath);
+ byte[] buffer = new byte[DEF_BUF_SIZE];
+ int bytesRead;
+ while ((bytesRead = is.read(buffer)) != -1) {
+ fos.write(buffer, 0, bytesRead);
+ }
+ bytesWritten = fos.getPos();
+ return bytesWritten;
+ } finally {
+ if (fos != null) {
+ fos.close();
+ }
+ }
+ }};
+ try {
+ return retriable.run();
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
public static void writeOutput(String content, Path outputFile, HiveConf hiveConf)
throws SemanticException {
Retry<Void> retriable = new Retry<Void>(IOException.class) {