You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/05/29 10:08:29 UTC

[hive] branch master updated: HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c1336f  HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)
1c1336f is described below

commit 1c1336f712c9a9a492c3c4e32993758026630484
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Fri May 29 15:38:17 2020 +0530

    HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  10 +
 .../TestReplicationScenariosAcrossInstances.java   | 145 +++++++++++++++
 pom.xml                                            |   2 +
 ql/if/queryplan.thrift                             |   4 +-
 ql/pom.xml                                         |  50 +++++
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp      |  10 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h        |   4 +-
 .../apache/hadoop/hive/ql/plan/api/StageType.java  |   8 +-
 ql/src/gen/thrift/gen-php/Types.php                |   4 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py       |   6 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb        |   6 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java    |   6 +
 .../hadoop/hive/ql/exec/repl/AtlasDumpTask.java    | 204 +++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/AtlasDumpWork.java    |  61 ++++++
 .../hadoop/hive/ql/exec/repl/AtlasLoadTask.java    | 137 ++++++++++++++
 .../hadoop/hive/ql/exec/repl/AtlasLoadWork.java    |  58 ++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  19 ++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  20 +-
 .../hive/ql/exec/repl/atlas/AtlasReplInfo.java     | 101 ++++++++++
 .../ql/exec/repl/atlas/AtlasRequestBuilder.java    | 192 +++++++++++++++++++
 .../hive/ql/exec/repl/atlas/AtlasRestClient.java   |  43 +++++
 .../ql/exec/repl/atlas/AtlasRestClientBuilder.java |  97 ++++++++++
 .../ql/exec/repl/atlas/AtlasRestClientImpl.java    | 175 ++++++++++++++++++
 .../ql/exec/repl/atlas/NoOpAtlasRestClient.java    |  57 ++++++
 .../hive/ql/exec/repl/atlas/RetryingClient.java    |  92 ++++++++++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  23 +++
 .../hadoop/hive/ql/parse/repl/dump/Utils.java      |  31 ++++
 27 files changed, 1556 insertions(+), 9 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 281c4e2..abd12c9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -539,6 +539,16 @@ public class HiveConf extends Configuration {
       true,
       "This configuration will add a deny policy on the target database for all users except hive"
         + " to avoid any update to the target database"),
+    REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false,
+            "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."),
+    REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null,
+            "Atlas endpoint of the current cluster hive database is getting replicated from/to."),
+    REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null,
+            "Target hive database name Atlas metadata of source hive database is being replicated to."),
+    REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null,
+            "Name of the source cluster for the replication."),
+    REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
+            "Name of the target cluster for the replication."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 65f7303..d7b360c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -43,11 +43,18 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1538,4 +1545,142 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+    verifyAtlasMetadataPresent();
+
+    confMap.remove("hive.repl.atlas.replicatedto");
+    replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+  }
+
+  @Test
+  public void testAtlasMissingConfigs() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)");
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+    confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
+    ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true);
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true);
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true);
+    confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true);
+    confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+    primary.dump(primaryDbName, getAtlasClause(confMap));
+    verifyAtlasMetadataPresent();
+
+    confMap.clear();
+    confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+    confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas");
+    ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false);
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false);
+    confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false);
+    confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+    primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap));
+  }
+
+  private void ensureInvalidUrl(List<String> atlasClause, String endpoint, boolean dump) throws Throwable {
+    try {
+      if (dump) {
+        primary.dump(primaryDbName, atlasClause);
+      } else {
+        primary.load(replicatedDbName, primaryDbName, atlasClause);
+      }
+    } catch (MalformedURLException e) {
+      return;
+    }
+    Assert.fail("Atlas endpoint is invalid and but test didn't fail:" + endpoint);
+  }
+
+  private void verifyAtlasMetadataPresent() throws IOException {
+    Path dbReplDir = new Path(primary.repldDir,
+            Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
+    FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf());
+    assertTrue(fs.exists(dbReplDir));
+    FileStatus[] dumpRoots = fs.listStatus(dbReplDir);
+    assert(dumpRoots.length == 1);
+    Path dumpRoot = dumpRoots[0].getPath();
+    assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR)));
+    Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR);
+    assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot));
+    assertTrue("Atlas export file doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME)));
+    assertTrue("Atlas dump metadata doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)));
+    BufferedReader br = null;
+    try {
+      br = new BufferedReader(new InputStreamReader(
+              fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]);
+      assertEquals(0, Long.parseLong(lineContents[1]));
+    } finally {
+      if (br != null) {
+        br.close();
+      }
+    }
+  }
+
+  private void ensureFailedReplOperation(List<String> clause, String conf, boolean dump) throws Throwable {
+    try {
+      if (dump) {
+        primary.dump(primaryDbName, clause);
+      } else {
+        primary.load(replicatedDbName, primaryDbName, clause);
+      }
+      Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
+    } catch (SemanticException e) {
+      assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication"));
+    }
+  }
+
+  private Map<String, String> defaultAtlasConfMap() {
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true");
+    confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true");
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas");
+    confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName);
+    confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0");
+    confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1");
+    return confMap;
+  }
+
+  private List<String> getAtlasClause(Map<String, String> confMap) {
+    List confList = new ArrayList();
+    for (Map.Entry<String, String> entry:confMap.entrySet()) {
+      confList.add(quote(entry.getKey()) + "=" + quote(entry.getValue()));
+    }
+    return confList;
+  }
+
+  private String quote(String str) {
+    return "'" + str + "'";
+  }
 }
diff --git a/pom.xml b/pom.xml
index b4b41ea..cedcb2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
     <apache-directory-server.version>1.5.7</apache-directory-server.version>
     <!-- Include arrow for LlapOutputFormatService -->
     <arrow.version>0.10.0</arrow.version>
+    <atlas.client.version>2.0.0</atlas.client.version>
     <avatica.version>1.12.0</avatica.version>
     <avro.version>1.8.2</avro.version>
     <calcite.version>1.21.0</calcite.version>
@@ -123,6 +124,7 @@
     <commons-codec.version>1.7</commons-codec.version>
     <commons-collections.version>3.2.2</commons-collections.version>
     <commons-compress.version>1.19</commons-compress.version>
+    <commons-configuration.version>1.10</commons-configuration.version>
     <commons-exec.version>1.1</commons-exec.version>
     <commons-io.version>2.6</commons-io.version>
     <commons-lang3.version>3.9</commons-lang3.version>
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index d7d7cdc..91bff61 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -109,7 +109,9 @@ enum StageType {
   SCHEDULED_QUERY_MAINT,
   ACK,
   RANGER_DUMP,
-  RANGER_LOAD
+  RANGER_LOAD,
+  ATLAS_DUMP,
+  ATLAS_LOAD
 }
 
 struct Stage {
diff --git a/ql/pom.xml b/ql/pom.xml
index 816b400..689d2d1 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -38,6 +38,56 @@
     <!-- intra-project -->
     <!-- used for vector code-gen -->
     <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-client-v2</artifactId>
+      <version>${atlas.client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>jul-to-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-client-common</artifactId>
+      <version>${atlas.client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>jul-to-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.atlas</groupId>
+      <artifactId>atlas-intg</artifactId>
+      <version>${atlas.client.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>jul-to-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>${commons-configuration.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-vector-code-gen</artifactId>
       <version>${project.version}</version>
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 00847d5..c7338e1 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -126,7 +126,9 @@ int _kStageTypeValues[] = {
   StageType::SCHEDULED_QUERY_MAINT,
   StageType::ACK,
   StageType::RANGER_DUMP,
-  StageType::RANGER_LOAD
+  StageType::RANGER_LOAD,
+  StageType::ATLAS_DUMP,
+  StageType::ATLAS_LOAD
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -149,9 +151,11 @@ const char* _kStageTypeNames[] = {
   "SCHEDULED_QUERY_MAINT",
   "ACK",
   "RANGER_DUMP",
-  "RANGER_LOAD"
+  "RANGER_LOAD",
+  "ATLAS_DUMP",
+  "ATLAS_LOAD
 };
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(23, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
 
 Adjacency::~Adjacency() throw() {
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index b3d9cf7..1e8c702 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -103,7 +103,9 @@ struct StageType {
     SCHEDULED_QUERY_MAINT = 17,
     ACK = 18,
     RANGER_DUMP = 19,
-    RANGER_LOAD = 20
+    RANGER_LOAD = 20,
+    ATLAS_DUMP = 21,
+    ATLAS_LOAD = 22
   };
 };
 
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 31c3429..eba0230 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -32,7 +32,9 @@ public enum StageType implements org.apache.thrift.TEnum {
   SCHEDULED_QUERY_MAINT(17),
   ACK(18),
   RANGER_DUMP(19),
-  RANGER_LOAD(20);
+  RANGER_LOAD(20),
+  ATLAS_DUMP(21),
+  ATLAS_LOAD(22);
 
   private final int value;
 
@@ -95,6 +97,10 @@ public enum StageType implements org.apache.thrift.TEnum {
         return RANGER_DUMP;
       case 20:
         return RANGER_LOAD;
+      case 21:
+        return ATLAS_DUMP;
+      case 22:
+        return ATLAS_LOAD;
       default:
         return null;
     }
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index d805d21..7cb3bea 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -125,6 +125,8 @@ final class StageType {
   const ACK = 18;
   const RANGER_DUMP = 19;
   const RANGER_LOAD = 20;
+  const ATLAS_DUMP = 21;
+  const ATLAS_LOAD = 22;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -147,6 +149,8 @@ final class StageType {
     18 => 'ACK',
     19 => 'RANGER_DUMP',
     20 => 'RANGER_LOAD',
+    21 => 'ATLAS_DUMP',
+    22 => 'ATLAS_LOAD',
   );
 }
 
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 33a53ce..39a20c3 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -172,6 +172,8 @@ class StageType:
   ACK = 18
   RANGER_DUMP = 19
   RANGER_LOAD = 20
+  ATLAS_DUMP = 21
+  ATLAS_LOAD = 22
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -195,6 +197,8 @@ class StageType:
     18: "ACK",
     19: "RANGER_DUMP",
     20: "RANGER_LOAD",
+    21: "ATLAS_DUMP",
+    22: "ATLAS_LOAD",
   }
 
   _NAMES_TO_VALUES = {
@@ -219,6 +223,8 @@ class StageType:
     "ACK": 18,
     "RANGER_DUMP": 19,
     "RANGER_LOAD": 20,
+    "ATLAS_DUMP": 21,
+    "ATLAS_LOAD": 22,
   }
 
 
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index d31ba3c..0e827b2 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -82,8 +82,10 @@ module StageType
   ACK = 18
   RANGER_DUMP = 19
   RANGER_LOAD = 20
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD]).freeze
+  ATLAS_DUMP = 21
+  ATLAS_LOAD = 22
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD", 21 => "ATLAS_DUMP", 22 => "ATLAS_LOAD"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD, ATLAS_DUMP, ATLAS_LOAD]).freeze
 end
 
 class Adjacency
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index f0e5461..fd2715d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.hive.ql.ddl.DDLTask;
 import org.apache.hadoop.hive.ql.ddl.DDLWork;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
@@ -120,6 +124,8 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class));
     taskvec.add(new TaskTuple<RangerDumpWork>(RangerDumpWork.class, RangerDumpTask.class));
     taskvec.add(new TaskTuple<RangerLoadWork>(RangerLoadWork.class, RangerLoadTask.class));
+    taskvec.add(new TaskTuple<AtlasDumpWork>(AtlasDumpWork.class, AtlasDumpTask.class));
+    taskvec.add(new TaskTuple<AtlasLoadWork>(AtlasLoadWork.class, AtlasLoadTask.class));
     taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
     taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
     taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
new file mode 100644
index 0000000..26cdc6b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClient atlasRestClient;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
+              .getClient(atlasReplInfo.getConf());
+      AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+      String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(),
+              atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+    String errorFormat = "%s is mandatory config for Atlas metadata replication";
+    //Also validates URL for endpoint.
+    String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
+            .toString();
+    String tgtDB = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, conf, errorFormat);
+    String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
+    String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), tgtDB, srcCluster,
+            tgtCluster, work.getStagingDir(), conf);
+    atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    long lastTimeStamp = work.isBootstrap() ? 0L : lastStoredTimeStamp();
+    atlasReplInfo.setTimeStamp(lastTimeStamp);
+    return atlasReplInfo;
+  }
+
+  private long lastStoredTimeStamp() throws SemanticException {
+    Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = prevMetadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+      String line = br.readLine();
+      if (line == null) {
+        throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file");
+      }
+      String[] lineContents = line.split("\t", 5);
+      return Long.parseLong(lineContents[1]);
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("Current timestamp is: {}", ret);
+    return ret;
+  }
+
+  private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
+          throws SemanticException {
+    InputStream inputStream = null;
+    try {
+      AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              atlasReplInfo.getSrcCluster());
+      inputStream = atlasRestClient.exportData(exportRequest);
+      FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
+      Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+      long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
+      LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten);
+    } catch (SemanticException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName,
+                                     String srcDb)
+          throws SemanticException {
+    AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
+    Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
+    if (entries == null || entries.isEmpty()) {
+      throw new SemanticException("Could find entries in objectId for:" + clusterName);
+    }
+    Map.Entry<String, Object> item = entries.iterator().next();
+    String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue());
+    if (guid == null || guid.isEmpty()) {
+      throw new SemanticException("Entity not found:" + objectId);
+    }
+    return guid;
+  }
+
+  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+    Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    atlasReplInfo.getSrcFsUri(),
+                    String.valueOf(lastModifiedTime)
+            )
+    );
+    Utils.writeOutput(listValues, dumpFile, conf, true);
+    LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString());
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.ATLAS_DUMP;
+  }
+
+  @Override
+  public String getName() {
+    return "ATLAS_DUMP";
+  }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
new file mode 100644
index 0000000..3344152
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+  }
+
+  public boolean isBootstrap() {
+    return bootstrap;
+  }
+
+  public Path getPrevAtlasDumpDir() {
+    return prevAtlasDumpDir;
+  }
+
+  public String getSrcDB() {
+    return srcDB;
+  }
+
+  public Path getStagingDir() {
+    return stagingDir;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
new file mode 100644
index 0000000..fceded5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo  = createAtlasReplInfo();
+      LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      int importCount = importAtlasMetadata(atlasReplInfo);
+      LOG.info("Atlas entities import count {}", importCount);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while loading atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+    String errorFormat = "%s is mandatory config for Atlas metadata replication";
+    //Also validates URL for endpoint.
+    String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
+            .toString();
+    String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
+    String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(),
+            srcCluster, tgtCluster, work.getStagingDir(), conf);
+    atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+    atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    return atlasReplInfo;
+  }
+
+  private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
+    Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = metadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+      String line = br.readLine();
+      if (line == null) {
+        throw new SemanticException("Could not read stored src FS Uri from atlas metadata file");
+      }
+      String[] lineContents = line.split("\t", 5);
+      return lineContents[0];
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
+    AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+    AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(
+            atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
+            atlasReplInfo.getSrcCluster(), atlasReplInfo.getTgtCluster(),
+            atlasReplInfo.getSrcFsUri(), atlasReplInfo.getTgtFsUri());
+    AtlasImportResult result = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
+            .getClient(atlasReplInfo.getConf()).importData(importRequest, atlasReplInfo);
+    if (result == null || result.getProcessedEntities() == null) {
+      LOG.info("No Atlas entity found");
+      return 0;
+    }
+    return result.getProcessedEntities().size();
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.ATLAS_LOAD;
+  }
+
+  @Override
+  public String getName() {
+    return "ATLAS_LOAD";
+  }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
new file mode 100644
index 0000000..4dc1ea8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+
+  public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) {
+    this.srcDB = srcDB;
+    this.tgtDB = tgtDB;
+    this.stagingDir = stagingDir;
+  }
+
+  public static long getSerialVersionUID() {
+    return serialVersionUID;
+  }
+
+  public String getSrcDB() {
+    return srcDB;
+  }
+
+  public String getTgtDB() {
+    return tgtDB;
+  }
+
+  public Path getStagingDir() {
+    return stagingDir;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 516b6f4..046b6a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -147,6 +147,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
           Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
           work.setCurrentDumpPath(currentDumpPath);
+          if (shouldDumpAtlasMetadata()) {
+            addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath);
+            LOG.info("Added task to dump atlas metadata.");
+          }
           if (shouldDumpAuthorizationMetadata()) {
             initiateAuthorizationDumpTask();
           }
@@ -195,6 +199,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA);
   }
 
+  private boolean shouldDumpAtlasMetadata() {
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
+  }
+
   private Path getEncodedDumpRootPath() throws UnsupportedEncodingException {
     return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
             Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
@@ -228,6 +236,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
+  private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) {
+    Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR);
+    Path prevAtlasDumpDir = prevHiveDumpDir == null ? null
+                            : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR);
+    AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir);
+    Task<?> atlasDumpTask = TaskFactory.get(atlasDumpWork, conf);
+    childTasks = new ArrayList<>();
+    childTasks.add(atlasDumpTask);
+  }
+
+
   private void finishRemainingTasks() throws SemanticException {
     Path dumpAckFile = new Path(work.getCurrentDumpPath(),
             ReplUtils.REPL_HIVE_BASE_DIR + File.separator
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7a30962..792e331 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -107,6 +107,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       }
       work.setRootTask(this);
       this.parentTasks = null;
+      if (shouldLoadAtlasMetadata()) {
+        addAtlasLoadTask();
+      }
       if (shouldLoadAuthorizationMetadata()) {
         initiateAuthorizationLoadTask();
       }
@@ -141,8 +144,23 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       childTasks.add(rangerLoadTask);
     } else {
       throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
-          + " not supported for replication ");
+              + " not supported for replication ");
+    }
+  }
+
+  private void addAtlasLoadTask() throws HiveException {
+    Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR);
+    LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir);
+    AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir);
+    Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf);
+    if (childTasks == null) {
+      childTasks = new ArrayList<>();
     }
+    childTasks.add(atlasLoadTask);
+  }
+
+  private boolean shouldLoadAtlasMetadata() {
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
   }
 
   private int executeBootStrapLoad() throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java
new file mode 100644
index 0000000..b0923d7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+/**
+ * Atlas metadata replication info holder.
+ */
+public class AtlasReplInfo {
+  private final String srcDB;
+  private final String tgtDB;
+  private final String srcCluster;
+  private final String tgtCluster;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final String atlasEndpoint;
+  private String srcFsUri;
+  private String tgtFsUri;
+  private long timeStamp;
+
+  public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String srcCluster,
+                       String tgtCluster, Path stagingDir, HiveConf conf) {
+    this.atlasEndpoint = atlasEndpoint;
+    this.srcDB = srcDB;
+    this.tgtDB = tgtDB;
+    this.srcCluster = srcCluster;
+    this.tgtCluster = tgtCluster;
+    this.stagingDir = stagingDir;
+    this.conf = conf;
+  }
+
+  public String getSrcDB() {
+    return srcDB;
+  }
+
+  public String getTgtDB() {
+    return tgtDB;
+  }
+
+  public String getSrcCluster() {
+    return srcCluster;
+  }
+
+  public String getTgtCluster() {
+    return tgtCluster;
+  }
+
+  public Path getStagingDir() {
+    return stagingDir;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public String getAtlasEndpoint() {
+    return atlasEndpoint;
+  }
+
+  public String getSrcFsUri() {
+    return srcFsUri;
+  }
+
+  public void setSrcFsUri(String srcFsUri) {
+    this.srcFsUri = srcFsUri;
+  }
+
+  public String getTgtFsUri() {
+    return tgtFsUri;
+  }
+
+  public void setTgtFsUri(String tgtFsUri) {
+    this.tgtFsUri = tgtFsUri;
+  }
+
+  public long getTimeStamp() {
+    return timeStamp;
+  }
+
+  public void setTimeStamp(long timeStamp) {
+    this.timeStamp = timeStamp;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
new file mode 100644
index 0000000..3c72f8f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class);
+  public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+  static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+  static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+  static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+  private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+  private static final String ATTRIBUTE_NAME_NAME = ".name";
+  private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+  private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom";
+  private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+  private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME;
+  private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME;
+  private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION;
+  private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION;
+
+  private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+  private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+  public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) {
+    List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer);
+    Map<String, Object> options = getOptions(atlasReplInfo);
+    return createRequest(itemsToExport, options);
+  }
+
+  private List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) {
+    List<AtlasObjectId> atlasObjectIds = new ArrayList<>();
+    final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB());
+    atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+    return atlasObjectIds;
+  }
+
+  private AtlasExportRequest createRequest(final List<AtlasObjectId> itemsToExport,
+                                           final Map<String, Object> options) {
+    AtlasExportRequest request = new AtlasExportRequest() {
+      {
+        setItemsToExport(itemsToExport);
+        setOptions(options);
+      }
+    };
+    LOG.debug("createRequest: {}" + request);
+    return request;
+  }
+
+  private Map<String, Object> getOptions(AtlasReplInfo atlasReplInfo) {
+    String targetCluster = atlasReplInfo.getTgtCluster();
+    Map<String, Object> options = new HashMap<>();
+    options.put(AtlasExportRequest.OPTION_FETCH_TYPE, AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
+    options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, atlasReplInfo.getTimeStamp());
+    options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true);
+    if (targetCluster != null && !targetCluster.isEmpty()) {
+      options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster);
+    }
+    return options;
+  }
+
+  public AtlasObjectId getItemToExport(String srcCluster, String srcDB) {
+    final String qualifiedName = getQualifiedName(srcCluster, srcDB);
+    return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+  }
+
+  private String getQualifiedName(String clusterName, String srcDb) {
+    String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, srcDb.toLowerCase(), clusterName);
+    LOG.debug("Atlas getQualifiedName: {}", qualifiedName);
+    return qualifiedName;
+  }
+
+  public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet,
+                                                String sourceClusterName, String targetClusterName,
+                                                String sourceFsEndpoint, String targetFsEndpoint) {
+    AtlasImportRequest request = new AtlasImportRequest();
+    addTransforms(request.getOptions(),
+            sourceClusterName, targetClusterName,
+            sourceDataSet, targetDataSet,
+            sourceFsEndpoint, targetFsEndpoint);
+    addReplicatedFrom(request.getOptions(), sourceClusterName);
+    LOG.debug("Atlas metadata import request: {}" + request);
+    return request;
+  }
+
+  private void addTransforms(Map<String, String> options, String srcClusterName,
+                             String tgtClusterName, String sourceDataSet, String targetDataSet,
+                             String sourcefsEndpoint, String targetFsEndpoint) {
+    List<AttributeTransform> transforms = new ArrayList<>();
+    String sanitizedSourceClusterName = sanitizeForClassificationName(srcClusterName);
+    addClassificationTransform(transforms,
+            String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName));
+    addClearReplicationAttributesTransform(transforms);
+    addClusterRenameTransform(transforms, srcClusterName, tgtClusterName);
+    if (!sourceDataSet.equals(targetDataSet)) {
+      addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet);
+    }
+    addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint);
+    options.put(AtlasImportRequest.TRANSFORMERS_KEY, AtlasType.toJson(transforms));
+  }
+
+  private void addLocationTransform(List<AttributeTransform> transforms, String srcFsUri, String tgtFsUri) {
+    transforms.add(create(
+            HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+    transforms.add(create(
+            HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+  }
+
+  private void addDataSetRenameTransform(List<AttributeTransform> transforms,
+                                         String sourceDataSet, String targetDataSet) {
+    transforms.add(create(
+            HIVE_DB_NAME, "EQUALS: " + sourceDataSet,
+            HIVE_DB_NAME, "SET: " + targetDataSet));
+  }
+
+  private void addClusterRenameTransform(List<AttributeTransform> transforms,
+                                         String srcClusterName, String tgtClustername) {
+    transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName,
+            HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername));
+  }
+
+  private void addReplicatedFrom(Map<String, String> options, String sourceClusterName) {
+    options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, sourceClusterName);
+  }
+
+  private void addClassificationTransform(List<AttributeTransform> transforms, String classificationName) {
+    transforms.add(create("__entity", "topLevel: ",
+            "__entity", "ADD_CLASSIFICATION: " + classificationName));
+  }
+
+  private String sanitizeForClassificationName(String s) {
+    if (StringUtils.isEmpty(s)) {
+      return s;
+    }
+    return s.replace('-', '_').replace(' ', '_');
+  }
+
+  private void addClearReplicationAttributesTransform(List<AttributeTransform> transforms) {
+    Map<String, String> actions = new HashMap<>();
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, "CLEAR:");
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, "CLEAR:");
+
+    transforms.add(new AttributeTransform(null, actions));
+  }
+
+  private AttributeTransform create(String conditionLhs, String conditionRhs,
+                                    String actionLhs, String actionRhs) {
+    return new AttributeTransform(Collections.singletonMap(conditionLhs, conditionRhs),
+            Collections.singletonMap(actionLhs, actionRhs));
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
new file mode 100644
index 0000000..dd72f83
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.InputStream;
+
+/**
+ * Atlas RESTClient interface for Atlas' REST APIs.
+ */
+public interface AtlasRestClient {
+
+  InputStream exportData(AtlasExportRequest request) throws Exception;
+
+  AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception;
+
+  AtlasServer getServer(String endpoint) throws SemanticException;
+
+  String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName)
+          throws SemanticException;
+
+  boolean getStatus() throws SemanticException;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
new file mode 100644
index 0000000..37b623f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  public AtlasRestClientBuilder(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+  }
+
+  public AtlasRestClient getClient(HiveConf conf) throws SemanticException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return new NoOpAtlasRestClient();
+    }
+    return create();
+  }
+
+  private AtlasRestClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setUGInfo();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
+            this.userGroupInformation.getShortUserName(), baseUrls);
+    return new AtlasRestClientImpl(clientV2);
+  }
+
+  private AtlasRestClientBuilder setUGInfo() throws SemanticException {
+    try {
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("AuthStrategy: Kerberos : urls: {} : userGroupInformation: {}", baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  private void initializeAtlasApplicationProperties() throws SemanticException {
+    try {
+      Properties props = new Properties();
+      props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1");
+      props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0");
+      props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl);
+      props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true");
+      ApplicationProperties.set(ConfigurationConverter.getConfiguration(props));
+    } catch (AtlasException e) {
+      throw new SemanticException(e);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
new file mode 100644
index 0000000..c8d738e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRestClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+    final ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Future<T> future = executor.submit(callable);
+    executor.shutdown();
+    try {
+      return future.get(timeout, timeUnit);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+      throw e;
+    } catch (ExecutionException e) {
+      Throwable t = e.getCause();
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof Exception) {
+        throw (Exception) t;
+      } else {
+        throw new IllegalStateException(t);
+      }
+    }
+  }
+
+  public InputStream exportData(AtlasExportRequest request) throws Exception {
+    LOG.debug("exportData: {}" + request);
+    return invokeWithRetry(new Callable<InputStream>() {
+      @Override
+      public InputStream call() throws Exception {
+        return clientV2.exportData(request);
+      }
+    }, null);
+  }
+
+  public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception {
+    AtlasImportResult defaultResult = getDefaultAtlasImportResult(request);
+    Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+    FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf());
+    if (!fs.exists(exportFilePath)) {
+      return defaultResult;
+    }
+    LOG.debug("Atlas import data request: {}" + request);
+    return invokeWithRetry(new Callable<AtlasImportResult>() {
+      @Override
+      public AtlasImportResult call() throws Exception {
+        InputStream is = null;
+        try {
+          is = fs.open(exportFilePath);
+          return clientV2.importData(request, is);
+        } finally {
+          if (is != null) {
+            is.close();
+          }
+        }
+      }
+    }, defaultResult);
+  }
+
+  private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) {
+    return new AtlasImportResult(request, "", "", "", 0L);
+  }
+
+  public AtlasServer getServer(String endpoint) throws SemanticException {
+    try {
+      return clientV2.getServer(endpoint);
+    } catch (AtlasServiceException e) {
+      int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
+      if (statusCode != NOT_FOUND.getStatusCode()) {
+        throw new SemanticException("Exception while getServer ", e.getCause());
+      }
+      LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage());
+    }
+    return null;
+  }
+
+  public String getEntityGuid(final String entityType,
+                              final String attributeName, final String qualifiedName) throws SemanticException {
+    int entityApiTimeOut = 10;
+    final Map<String, String> attributes = new HashMap<String, String>() {
+      {
+        put(attributeName, qualifiedName);
+      }
+    };
+
+    try {
+      AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout(
+          new Callable<AtlasEntity.AtlasEntityWithExtInfo>() {
+            @Override
+            public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception {
+              return clientV2.getEntityByAttribute(entityType, attributes);
+            }
+          }, entityApiTimeOut, TimeUnit.SECONDS);
+
+      if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) {
+        LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}",
+                entityType, attributeName, qualifiedName);
+        return null;
+      }
+      return entityWithExtInfo.getEntity().getGuid();
+    } catch (AtlasServiceException e) {
+      int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1;
+      if (statusCode != NOT_FOUND.getStatusCode()) {
+        throw new SemanticException("Exception while getEntityGuid ", e.getCause());
+      }
+      LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}",
+              entityType, attributeName, qualifiedName, e.getMessage());
+      return null;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  public boolean getStatus() throws SemanticException {
+    try {
+      return clientV2.isServerReady();
+    } catch (AtlasServiceException e) {
+      throw new SemanticException(e.getCause());
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
new file mode 100644
index 0000000..59dcbf7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class NoOpAtlasRestClient implements AtlasRestClient {
+
+  public InputStream exportData(AtlasExportRequest request) {
+    return new ByteArrayInputStream("Dummy".getBytes(Charset.forName("UTF-8")));
+  }
+
+  public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) {
+    return new AtlasImportResult(request, "", "", "", 0L);
+  }
+
+  public AtlasServer getServer(String endpoint) {
+    return new AtlasServer();
+  }
+
+  public String getEntityGuid(final String entityType,
+                              final String attributeName, final String qualifiedName) {
+    return UUID.randomUUID().toString();
+  }
+
+  public boolean getStatus() {
+    return true;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
new file mode 100644
index 0000000..dbc065a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.atlas.AtlasServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Implement retry logic for service calls.
+ */
+public class RetryingClient {
+  private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class);
+  private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000);
+  private static final int RETRY_COUNT_DEFAULT = 5;
+  private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
+  private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
+  private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
+  private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT;
+  private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT;
+
+  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
+    for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) {
+      try {
+        LOG.debug("Retrying method: {}", func.getClass().getName(), null);
+        return func.call();
+      } catch (Exception e) {
+        if (processImportExportLockException(e, currentRetryCount)) {
+          continue;
+        }
+        if (processInvalidParameterException(e)) {
+          return null;
+        }
+        LOG.error(func.getClass().getName(), e);
+        throw new Exception(e);
+      }
+    }
+    return defaultReturnValue;
+  }
+
+  private boolean processInvalidParameterException(Exception e) {
+    if (e instanceof UniformInterfaceException) {
+      return true;
+    }
+    if (!(e instanceof AtlasServiceException)) {
+      return false;
+    }
+    if (e.getMessage() == null) {
+      return false;
+    }
+    return (e.getMessage().contains(ERROR_MESSAGE_NO_ENTITIES)
+            || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
+  }
+
+  private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception {
+    if (!(e instanceof AtlasServiceException)) {
+      return false;
+    }
+    String excMessage = e.getMessage() == null ? "" : e.getMessage();
+    if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
+      try {
+        int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount;
+        LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration);
+        Thread.sleep(pauseDuration);
+      } catch (InterruptedException intEx) {
+        LOG.error("Pause wait interrupted!", intEx);
+        throw new Exception(intEx);
+      }
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 543ceca..c0aadb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.DDLWork;
 import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
@@ -82,6 +83,19 @@ public class ReplUtils {
   // Root base directory name for ranger.
   public static final String REPL_RANGER_BASE_DIR = "ranger";
 
+  // Root base directory name for atlas.
+  public static final String REPL_ATLAS_BASE_DIR = "atlas";
+
+  // Atlas meta data export file.
+  public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip";
+
+  // Config for hadoop default file system.
+  public static final String DEFAULT_FS_CONFIG = "fs.defaultFS";
+
+  // Cluster name separator, used when the cluster name contains data center name as well, e.g. dc$mycluster1.
+  public static final String CLUSTER_NAME_SEPARATOR = "$";
+
+
   // Name of the directory which stores the list of tables included in the policy in case of table level replication.
   // One file per database, named after the db name. The directory is not created for db level replication.
   public static final String REPL_TABLE_LIST_DIR_NAME = "_tables";
@@ -184,6 +198,15 @@ public class ReplUtils {
     return false;
   }
 
+  public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat)
+          throws SemanticException {
+    String val = hiveConf.get(configParam);
+    if (StringUtils.isEmpty(val)) {
+      throw new SemanticException(String.format(errorMsgFormat, configParam));
+    }
+    return val;
+  }
+
   public static boolean isTableMigratingToTransactional(HiveConf conf,
                                                  org.apache.hadoop.hive.metastore.api.Table tableObj)
   throws TException, IOException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 44320a5..154f028 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,6 +59,7 @@ import java.util.UUID;
 public class Utils {
   private static Logger LOG = LoggerFactory.getLogger(Utils.class);
   public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state.";
+  private static final int DEF_BUF_SIZE = 8 * 1024;
 
   public enum ReplDumpState {
     IDLE, ACTIVE
@@ -97,6 +100,34 @@ public class Utils {
     }
   }
 
+  public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws SemanticException {
+    Retry<Long> retriable = new Retry<Long>(IOException.class) {
+      @Override
+      public Long execute() throws IOException {
+        FSDataOutputStream fos = null;
+        try {
+          long bytesWritten;
+          fos = fs.create(exportFilePath);
+          byte[] buffer = new byte[DEF_BUF_SIZE];
+          int bytesRead;
+          while ((bytesRead = is.read(buffer)) != -1) {
+            fos.write(buffer, 0, bytesRead);
+          }
+          bytesWritten = fos.getPos();
+          return bytesWritten;
+        } finally {
+          if (fos != null) {
+            fos.close();
+          }
+        }
+      }};
+    try {
+      return retriable.run();
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
   public static void writeOutput(String content, Path outputFile, HiveConf hiveConf)
           throws SemanticException {
     Retry<Void> retriable = new Retry<Void>(IOException.class) {