You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/05/12 04:48:44 UTC

[hive] branch master updated: HIVE-23351: Ranger Replication Scheduling (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fdf6758  HIVE-23351: Ranger Replication Scheduling (Aasha Medhi, reviewed by Pravin Kumar Sinha)
fdf6758 is described below

commit fdf67580618193c46c7f06d08522e1da2bbaf3b9
Author: Aasha Medhi <aa...@gmail.com>
AuthorDate: Tue May 12 10:18:32 2020 +0530

    HIVE-23351: Ranger Replication Scheduling (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   16 +
 .../hive/ql/parse/TestReplicationScenarios.java    |   11 +-
 .../TestReplicationScenariosAcrossInstances.java   |   68 +
 ql/pom.xml                                         |    5 +
 .../apache/hadoop/hive/ql/plan/api/StageType.java  |   10 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java    |    6 +
 .../hadoop/hive/ql/exec/repl/RangerDumpTask.java   |  130 ++
 .../hadoop/hive/ql/exec/repl/RangerDumpWork.java   |   50 +
 .../hadoop/hive/ql/exec/repl/RangerLoadTask.java   |  137 ++
 .../hadoop/hive/ql/exec/repl/RangerLoadWork.java   |   59 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   31 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  550 +++----
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |    8 +-
 .../ql/exec/repl/ranger/NoOpRangerRestClient.java  |   71 +
 .../ql/exec/repl/ranger/RangerBaseModelObject.java |  191 +++
 .../exec/repl/ranger/RangerExportPolicyList.java   |   52 +
 .../hive/ql/exec/repl/ranger/RangerPolicy.java     | 1513 ++++++++++++++++++++
 .../hive/ql/exec/repl/ranger/RangerPolicyList.java |   72 +
 .../hive/ql/exec/repl/ranger/RangerRestClient.java |   50 +
 .../ql/exec/repl/ranger/RangerRestClientImpl.java  |  359 +++++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |    7 +
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |    3 +-
 .../hive/ql/exec/repl/TestRangerDumpTask.java      |  115 ++
 .../hive/ql/exec/repl/TestRangerLoadTask.java      |  106 ++
 24 files changed, 3349 insertions(+), 271 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4f2ea9a..f5ad3a8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -526,6 +526,22 @@ public class HiveConf extends Configuration {
         "This is the base directory on the target/replica warehouse under which data for "
             + "external tables is stored. This is relative base path and hence prefixed to the source "
             + "external table path on target cluster."),
+    REPL_INCLUDE_AUTHORIZATION_METADATA("hive.repl.include.authorization.metadata", false,
+            "This configuration will enable security and authorization related metadata along "
+                    + "with the hive data and metadata replication. "),
+    REPL_AUTHORIZATION_PROVIDER_SERVICE("hive.repl.authorization.provider.service", "ranger",
+            "This configuration will define which service will provide the security and authorization "
+                    + "related metadata that needs to be replicated along "
+                    + "with the hive data and metadata replication. Set the configuration "
+                    + "hive.repl.include.authorization.metadata to false to disable "
+                    + "security policies being replicated "),
+    REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT("hive.repl.authorization.provider.service.endpoint",
+            "",
+            "This configuration will define the authorization service endpoint"),
+    REPL_RANGER_SERVICE_NAME("hive.repl.ranger.service.name",
+            "hive",
+            "This configuration will define the service name for which the ranger authorization"
+                    + " policies needs to be replicated"),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index fa96b87..641df00 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -424,11 +424,12 @@ public class TestReplicationScenarios {
     return validator.hasTask(rootTask);
   }
 
-  private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable {
+  private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump,
+                                   Tuple tuple) throws Throwable {
     HiveConf confTemp = new HiveConf();
     confTemp.set("hive.repl.enable.move.optimization", "true");
     Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb,
+    ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb,
             null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId));
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext());
@@ -448,7 +449,7 @@ public class TestReplicationScenarios {
     Tuple dump = replDumpDb(dbName);
 
     //bootstrap load should not have move task
-    Task task = getReplLoadRootTask(dbNameReplica, false, dump);
+    Task task = getReplLoadRootTask(dbName, dbNameReplica, false, dump);
     assertEquals(false, hasMoveTask(task));
     assertEquals(true, hasPartitionTask(task));
 
@@ -462,7 +463,7 @@ public class TestReplicationScenarios {
 
     // Partition level statistics gets updated as part of the INSERT above. So we see a partition
     // task corresponding to an ALTER_PARTITION event.
-    task = getReplLoadRootTask(dbNameReplica, true, dump);
+    task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
     assertEquals(true, hasMoveTask(task));
     assertEquals(true, hasPartitionTask(task));
 
@@ -475,7 +476,7 @@ public class TestReplicationScenarios {
     dump = replDumpDb(dbName);
 
     //no move task should be added as the operation is adding a dynamic partition
-    task = getReplLoadRootTask(dbNameReplica, true, dump);
+    task = getReplLoadRootTask(dbName, dbNameReplica, true, dump);
     assertEquals(false, hasMoveTask(task));
     assertEquals(true, hasPartitionTask(task));
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 1adc4fb..901a4ed 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1493,4 +1493,72 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("china", "india"));
   }
+
+  /*
+  Can't test complete replication as mini ranger is not supported
+  Testing just the configs and no impact on existing replication
+   */
+  @Test
+  public void testRangerReplication() throws Throwable {
+    List<String> clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
+        "'hive.in.test'='true'",
+        "'hive.repl.authorization.provider.service.endpoint'='http://localhost:6080/ranger'");
+    primary.run("use " + primaryDbName)
+        .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+            "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+        .run("create table table1 (i String)")
+        .run("insert into table1 values (1)")
+        .run("insert into table1 values (2)")
+        .dump(primaryDbName, clause);
+
+    replica.load(replicatedDbName, primaryDbName, clause)
+        .run("use " + replicatedDbName)
+        .run("show tables")
+        .verifyResults(new String[] {"acid_table", "table1"})
+        .run("select * from table1")
+        .verifyResults(new String[] {"1", "2"});
+  }
+
+  /*
+  Can't test complete replication as mini ranger is not supported
+  Testing just the configs and no impact on existing replication
+   */
+  @Test
+  public void testFailureRangerReplication() throws Throwable {
+    List<String> clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
+        "'hive.in.test'='true'");
+    primary.run("use " + primaryDbName)
+        .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+            "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+        .run("create table table1 (i String)")
+        .run("insert into table1 values (1)")
+        .run("insert into table1 values (2)");
+    try {
+      primary.dump(primaryDbName, clause);
+    } catch (Exception e) {
+      assertEquals("Ranger endpoint is not valid. Please pass a valid config "
+          + "hive.repl.authorization.provider.service.endpoint", e.getMessage());
+    }
+  }
+
+  /*
+Can't test complete replication as mini ranger is not supported
+Testing just the configs and no impact on existing replication
+ */
+  @Test
+  public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
+    List<String> clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'",
+        "'hive.in.test'='true'", "'hive.repl.authorization.provider.service'='sentry'");
+    primary.run("use " + primaryDbName)
+        .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+            "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+        .run("create table table1 (i String)")
+        .run("insert into table1 values (1)")
+        .run("insert into table1 values (2)");
+    try {
+      primary.dump(primaryDbName, clause);
+    } catch (SemanticException e) {
+      assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
+    }
+  }
 }
diff --git a/ql/pom.xml b/ql/pom.xml
index 7c7c82a..d6dc7ce 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -816,6 +816,11 @@
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.sun.jersey.contribs</groupId>
+      <artifactId>jersey-multipart</artifactId>
+      <version>${jersey.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 25d530c..57b028b 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -26,7 +26,9 @@ public enum StageType implements org.apache.thrift.TEnum {
   REPL_TXN(15),
   REPL_INCREMENTAL_LOAD(16),
   SCHEDULED_QUERY_MAINT(17),
-  ACK(18);
+  ACK(18),
+  RANGER_DUMP(19),
+  RANGER_LOAD(20);
 
   private final int value;
 
@@ -83,6 +85,12 @@ public enum StageType implements org.apache.thrift.TEnum {
         return REPL_INCREMENTAL_LOAD;
       case 17:
         return SCHEDULED_QUERY_MAINT;
+      case 18:
+        return ACK;
+      case 19:
+        return RANGER_DUMP;
+      case 20:
+        return RANGER_LOAD;
       default:
         return null;
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index c82e8d2..f0e5461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -37,6 +37,10 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.DirCopyTask;
 import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork;
+import org.apache.hadoop.hive.ql.exec.repl.RangerLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.RangerLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.RangerDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.RangerDumpTask;
 import org.apache.hadoop.hive.ql.exec.schq.ScheduledQueryMaintenanceTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -114,6 +118,8 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
     taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class));
     taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class));
+    taskvec.add(new TaskTuple<RangerDumpWork>(RangerDumpWork.class, RangerDumpTask.class));
+    taskvec.add(new TaskTuple<RangerLoadWork>(RangerLoadWork.class, RangerLoadTask.class));
     taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
     taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
     taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
new file mode 100644
index 0000000..f9d3de7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.NoOpRangerRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME;
+
+/**
+ * RangerDumpTask.
+ *
+ * Exports the Ranger security policies to staging directory.
+ **/
+public class RangerDumpTask extends Task<RangerDumpWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(RangerDumpTask.class);
+
+  private transient RangerRestClient rangerRestClient;
+
+  public RangerDumpTask() {
+    super();
+  }
+
+  @VisibleForTesting
+  RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
+    this.conf = conf;
+    this.work = work;
+    this.rangerRestClient = rangerRestClient;
+  }
+
+  @Override
+  public String getName() {
+    return "RANGER_DUMP";
+  }
+
+  @Override
+  public int execute() {
+    try {
+      int exportCount = 0;
+      Path filePath = null;
+      LOG.info("Exporting Ranger Metadata");
+      if (rangerRestClient == null) {
+        rangerRestClient = getRangerRestClient();
+      }
+      String rangerEndpoint = conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT);
+      if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
+        throw new Exception("Ranger endpoint is not valid. "
+                + "Please pass a valid config hive.repl.authorization.provider.service.endpoint");
+      }
+      String rangerHiveServiceName = conf.getVar(REPL_RANGER_SERVICE_NAME);
+      RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
+              work.getDbName(), rangerHiveServiceName);
+      List<RangerPolicy> rangerPolicies = rangerExportPolicyList.getPolicies();
+      if (rangerPolicies.isEmpty()) {
+        LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+        rangerExportPolicyList = new RangerExportPolicyList();
+      } else {
+        rangerPolicies = rangerRestClient.removeMultiResourcePolicies(rangerPolicies);
+      }
+      if (!CollectionUtils.isEmpty(rangerPolicies)) {
+        rangerExportPolicyList.setPolicies(rangerPolicies);
+        filePath = rangerRestClient.saveRangerPoliciesToFile(rangerExportPolicyList,
+                work.getCurrentDumpPath(), ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, conf);
+        if (filePath != null) {
+          LOG.info("Ranger policy export finished successfully");
+          exportCount = rangerExportPolicyList.getListSize();
+        }
+      }
+      LOG.debug("Ranger policy export filePath:" + filePath);
+      LOG.info("Number of ranger policies exported {}", exportCount);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("failed", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private RangerRestClient getRangerRestClient() {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      return new NoOpRangerRestClient();
+    }
+    return new RangerRestClientImpl();
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.RANGER_DUMP;
+  }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java
new file mode 100644
index 0000000..873a2ec
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+
+import java.io.Serializable;
+
+/**
+ * RangerDumpWork.
+ *
+ * Export Ranger authorization policies.
+ **/
+@Explain(displayName = "Ranger Dump Operator", explainLevels = { Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class RangerDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private Path currentDumpPath;
+  private String dbName;
+
+  public RangerDumpWork(Path currentDumpPath, String dbName) {
+    this.currentDumpPath = currentDumpPath;
+    this.dbName = dbName;
+  }
+
+  public Path getCurrentDumpPath() {
+    return currentDumpPath;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
new file mode 100644
index 0000000..5497d28
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.NoOpRangerRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME;
+
+/**
+ * RangerLoadTask.
+ *
+ * Rask to import Ranger authorization policies.
+ **/
+public class RangerLoadTask extends Task<RangerLoadWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(RangerLoadTask.class);
+
+  private transient RangerRestClient rangerRestClient;
+
+  public RangerLoadTask() {
+    super();
+  }
+
+  @VisibleForTesting
+  RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work) {
+    this.conf = conf;
+    this.work = work;
+    this.rangerRestClient = rangerRestClient;
+  }
+
+  @Override
+  public String getName() {
+    return "RANGER_LOAD";
+  }
+
+  @Override
+  public int execute() {
+    try {
+      LOG.info("Importing Ranger Metadata");
+      RangerExportPolicyList rangerExportPolicyList = null;
+      List<RangerPolicy> rangerPolicies = null;
+      if (rangerRestClient == null) {
+        rangerRestClient = getRangerRestClient();
+      }
+      String rangerEndpoint = conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT);
+      if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
+        throw new Exception("Ranger endpoint is not valid. "
+                + "Please pass a valid config hive.repl.authorization.provider.service.endpoint");
+      }
+      if (work.getCurrentDumpPath() != null) {
+        LOG.info("Importing Ranger Metadata from {} ", work.getCurrentDumpPath());
+        rangerExportPolicyList = rangerRestClient.readRangerPoliciesFromJsonFile(new Path(work.getCurrentDumpPath(),
+                ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME), conf);
+        if (rangerExportPolicyList != null && !CollectionUtils.isEmpty(rangerExportPolicyList.getPolicies())) {
+          rangerPolicies = rangerExportPolicyList.getPolicies();
+        }
+      }
+
+      if (CollectionUtils.isEmpty(rangerPolicies)) {
+        LOG.info("There are no ranger policies to import");
+        rangerPolicies = new ArrayList<>();
+      }
+      List<RangerPolicy> updatedRangerPolicies = rangerRestClient.changeDataSet(rangerPolicies, work.getSourceDbName(),
+              work.getTargetDbName());
+      int importCount = 0;
+      if (!CollectionUtils.isEmpty(updatedRangerPolicies)) {
+        if (rangerExportPolicyList == null) {
+          rangerExportPolicyList = new RangerExportPolicyList();
+        }
+        rangerExportPolicyList.setPolicies(updatedRangerPolicies);
+        rangerRestClient.importRangerPolicies(rangerExportPolicyList, work.getTargetDbName(), rangerEndpoint,
+                conf.getVar(REPL_RANGER_SERVICE_NAME));
+        LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize());
+        importCount = rangerExportPolicyList.getListSize();
+        LOG.info("Ranger policy import finished {} ", importCount);
+      }
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Failed", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private RangerRestClient getRangerRestClient() {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      return new NoOpRangerRestClient();
+    }
+    return new RangerRestClientImpl();
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.RANGER_LOAD;
+  }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java
new file mode 100644
index 0000000..64f5df0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * RangerLoadWork.
+ *
+ * Work to import Ranger authorization policies.
+ **/
+@Explain(displayName = "Ranger Load Operator", explainLevels = { Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class RangerLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(RangerLoadWork.class);
+  private Path currentDumpPath;
+  private String targetDbName;
+  private String sourceDbName;
+
+  public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName) {
+    this.currentDumpPath = currentDumpPath;
+    this.targetDbName = targetDbName;
+    this.sourceDbName = sourceDbName;
+  }
+
+  public Path getCurrentDumpPath() {
+    return currentDumpPath;
+  }
+
+  public String getTargetDbName() {
+    return targetDbName;
+  }
+
+  public String getSourceDbName() {
+    return sourceDbName;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 062f367..a7fd0ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -99,6 +99,7 @@ import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
 
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -144,6 +145,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         if (shouldDump(previousValidHiveDumpPath)) {
           Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
           Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+          work.setCurrentDumpPath(currentDumpPath);
+          if (shouldDumpAuthorizationMetadata()) {
+            initiateAuthorizationDumpTask();
+          }
           DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf);
           // Initialize ReplChangeManager instance since we will require it to encode file URI.
           ReplChangeManager.getInstance(conf);
@@ -156,7 +161,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
           }
           work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
-          work.setCurrentDumpPath(currentDumpPath);
           initiateDataCopyTasks();
         } else {
           LOG.info("Previous Dump is not yet loaded");
@@ -170,6 +174,26 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return 0;
   }
 
+  private void initiateAuthorizationDumpTask() throws SemanticException {
+    if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) {
+      Path rangerDumpRoot = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_RANGER_BASE_DIR);
+      LOG.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot);
+      RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern);
+      Task<RangerDumpWork> rangerDumpTask = TaskFactory.get(rangerDumpWork, conf);
+      if (childTasks == null) {
+        childTasks = new ArrayList<>();
+      }
+      childTasks.add(rangerDumpTask);
+    } else {
+      throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+              + " not supported for replication ");
+    }
+  }
+
+  private boolean shouldDumpAuthorizationMetadata() {
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA);
+  }
+
   private Path getEncodedDumpRootPath() throws UnsupportedEncodingException {
     return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
             Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
@@ -190,7 +214,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
   private void initiateDataCopyTasks() throws SemanticException {
     TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
-    List<Task<?>> childTasks = new ArrayList<>();
+    if (childTasks == null) {
+      childTasks = new ArrayList<>();
+    }
     childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
     childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf));
     if (childTasks.isEmpty()) {
@@ -198,7 +224,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       finishRemainingTasks();
     } else {
       DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
-      this.childTasks = childTasks;
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index b578d48..7a30962 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,6 +73,7 @@ import java.util.Map;
 
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
 
 public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -97,206 +100,247 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
   @Override
   public int execute() {
-    Task<?> rootTask = work.getRootTask();
-    if (rootTask != null) {
-      rootTask.setChildTasks(null);
+    try {
+      Task<?> rootTask = work.getRootTask();
+      if (rootTask != null) {
+        rootTask.setChildTasks(null);
+      }
+      work.setRootTask(this);
+      this.parentTasks = null;
+      if (shouldLoadAuthorizationMetadata()) {
+        initiateAuthorizationLoadTask();
+      }
+      if (work.isIncrementalLoad()) {
+        return executeIncrementalLoad();
+      } else {
+        return executeBootStrapLoad();
+      }
+    } catch (RuntimeException e) {
+      LOG.error("replication failed with run time exception", e);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("replication failed", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
     }
-    work.setRootTask(this);
-    this.parentTasks = null;
-    if (work.isIncrementalLoad()) {
-      return executeIncrementalLoad();
+  }
+
+  private boolean shouldLoadAuthorizationMetadata() {
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA);
+  }
+
+  private void initiateAuthorizationLoadTask() throws SemanticException {
+    if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) {
+      Path rangerLoadRoot = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR);
+      LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot);
+      RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn);
+      Task<RangerLoadWork> rangerLoadTask = TaskFactory.get(rangerLoadWork, conf);
+      if (childTasks == null) {
+        childTasks = new ArrayList<>();
+      }
+      childTasks.add(rangerLoadTask);
     } else {
-      return executeBootStrapLoad();
+      throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE)
+          + " not supported for replication ");
     }
   }
 
-  private int executeBootStrapLoad() {
-    try {
-      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context loadContext = new Context(work.dumpDirectory, conf, getHive(),
-              work.sessionStateLineageState, context);
-      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
-      /*
-          for now for simplicity we are doing just one directory ( one database ), come back to use
-          of multiple databases once we have the basic flow to chain creating of tasks in place for
-          a database ( directory )
-      */
-      BootstrapEventsIterator iterator = work.bootstrapIterator();
-      ConstraintEventsIterator constraintIterator = work.constraintsIterator();
+  private int executeBootStrapLoad() throws Exception {
+    int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+    Context loadContext = new Context(work.dumpDirectory, conf, getHive(),
+        work.sessionStateLineageState, context);
+    TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+    /*
+        for now for simplicity we are doing just one directory ( one database ), come back to use
+        of multiple databases once we have the basic flow to chain creating of tasks in place for
+        a database ( directory )
+    */
+    BootstrapEventsIterator iterator = work.bootstrapIterator();
+    ConstraintEventsIterator constraintIterator = work.constraintsIterator();
+    /*
+    This is used to get hold of a reference during the current creation of tasks and is initialized
+    with "0" tasks such that it will be non consequential in any operations done with task tracker
+    compositions.
+     */
+    TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+    TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+    Scope scope = new Scope();
+    boolean loadingConstraint = false;
+    if (!iterator.hasNext() && constraintIterator.hasNext()) {
+      loadingConstraint = true;
+    }
+    while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()))
+        && loadTaskTracker.canAddMoreTasks()) {
+      BootstrapEvent next;
+      if (!loadingConstraint) {
+        next = iterator.next();
+      } else {
+        next = constraintIterator.next();
+      }
+      switch (next.eventType()) {
+      case Database:
+        DatabaseEvent dbEvent = (DatabaseEvent) next;
+        dbTracker = new LoadDatabase(loadContext, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks();
+        loadTaskTracker.update(dbTracker);
+        if (work.hasDbState()) {
+          loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
+        } else {
+          // Scope might have set to database in some previous iteration of loop, so reset it to false if database
+          // tracker has no tasks.
+          scope.database = false;
+        }
+        work.updateDbEventState(dbEvent.toState());
+        if (dbTracker.hasTasks()) {
+          scope.rootTasks.addAll(dbTracker.tasks());
+          scope.database = true;
+        }
+        dbTracker.debugLog("database");
+        break;
+      case Table:
       /*
-      This is used to get hold of a reference during the current creation of tasks and is initialized
-      with "0" tasks such that it will be non consequential in any operations done with task tracker
-      compositions.
+          Implicit assumption here is that database level is processed first before table level,
+          which will depend on the iterator used since it should provide the higher level directory
+          listing before providing the lower level listing. This is also required such that
+          the dbTracker /  tableTracker are setup correctly always.
        */
-      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
-      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
-      Scope scope = new Scope();
-      boolean loadingConstraint = false;
-      if (!iterator.hasNext() && constraintIterator.hasNext()) {
-        loadingConstraint = true;
-      }
-      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()))
-              && loadTaskTracker.canAddMoreTasks()) {
-        BootstrapEvent next;
-        if (!loadingConstraint) {
-          next = iterator.next();
+        TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
+        FSTableEvent tableEvent = (FSTableEvent) next;
+        if (TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
+          tableTracker = new TaskTracker(1);
+          tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf));
         } else {
-          next = constraintIterator.next();
+          LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext,
+              loadTaskTracker);
+          tableTracker = loadTable.tasks(work.isIncrementalLoad());
         }
-        switch (next.eventType()) {
-        case Database:
-          DatabaseEvent dbEvent = (DatabaseEvent) next;
-          dbTracker = new LoadDatabase(loadContext, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks();
-          loadTaskTracker.update(dbTracker);
-          if (work.hasDbState()) {
-            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
-          }  else {
-            // Scope might have set to database in some previous iteration of loop, so reset it to false if database
-            // tracker has no tasks.
-            scope.database = false;
-          }
-          work.updateDbEventState(dbEvent.toState());
-          if (dbTracker.hasTasks()) {
-            scope.rootTasks.addAll(dbTracker.tasks());
-            scope.database = true;
-          }
-          dbTracker.debugLog("database");
-          break;
-        case Table: {
-          /*
-              Implicit assumption here is that database level is processed first before table level,
-              which will depend on the iterator used since it should provide the higher level directory
-              listing before providing the lower level listing. This is also required such that
-              the dbTracker /  tableTracker are setup correctly always.
-           */
-          TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
-          FSTableEvent tableEvent = (FSTableEvent) next;
-          if (TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
-            tableTracker = new TaskTracker(1);
-            tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf));
-          } else {
-            LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext,
-                loadTaskTracker);
-            tableTracker = loadTable.tasks(work.isIncrementalLoad());
-          }
-
-          setUpDependencies(dbTracker, tableTracker);
-          if (!scope.database && tableTracker.hasTasks()) {
-            scope.rootTasks.addAll(tableTracker.tasks());
-            scope.table = true;
-          } else {
-            // Scope might have set to table in some previous iteration of loop, so reset it to false if table
-            // tracker has no tasks.
-            scope.table = false;
-          }
 
-          if (!TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
-            /*
-              for table replication if we reach the max number of tasks then for the next run we will
-              try to reload the same table again, this is mainly for ease of understanding the code
-              as then we can avoid handling == > loading partitions for the table given that
-              the creation of table lead to reaching max tasks vs,  loading next table since current
-              one does not have partitions.
-             */
-
-            // for a table we explicitly try to load partitions as there is no separate partitions events.
-            LoadPartitions loadPartitions =
-                new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent,
-                    work.dbNameToLoadIn, tableContext);
-            TaskTracker partitionsTracker = loadPartitions.tasks();
-            partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
-                partitionsTracker);
-            tableTracker.debugLog("table");
-            partitionsTracker.debugLog("partitions for table");
-          }
-          break;
+        setUpDependencies(dbTracker, tableTracker);
+        if (!scope.database && tableTracker.hasTasks()) {
+          scope.rootTasks.addAll(tableTracker.tasks());
+          scope.table = true;
+        } else {
+          // Scope might have set to table in some previous iteration of loop, so reset it to false if table
+          // tracker has no tasks.
+          scope.table = false;
         }
-        case Partition: {
-          /*
-              This will happen only when loading tables and we reach the limit of number of tasks we can create;
-              hence we know here that the table should exist and there should be a lastPartitionName
-          */
-          PartitionEvent event = (PartitionEvent) next;
-          TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
+
+        if (!TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) {
+        /*
+          for table replication if we reach the max number of tasks then for the next run we will
+          try to reload the same table again, this is mainly for ease of understanding the code
+          as then we can avoid handling == > loading partitions for the table given that
+          the creation of table lead to reaching max tasks vs,  loading next table since current
+          one does not have partitions.
+         */
+
+          // for a table we explicitly try to load partitions as there is no separate partitions events.
           LoadPartitions loadPartitions =
-              new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
-                      event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
-          /*
-               the tableTracker here should be a new instance and not an existing one as this can
-               only happen when we break in between loading partitions.
-           */
+              new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent,
+                  work.dbNameToLoadIn, tableContext);
           TaskTracker partitionsTracker = loadPartitions.tasks();
           partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
               partitionsTracker);
-          partitionsTracker.debugLog("partitions");
-          break;
-        }
-        case Function: {
-          LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(),
-                                              (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
-          TaskTracker functionsTracker = loadFunction.tasks();
-          if (!scope.database) {
-            scope.rootTasks.addAll(functionsTracker.tasks());
-          } else {
-            setUpDependencies(dbTracker, functionsTracker);
-          }
-          loadTaskTracker.update(functionsTracker);
-          functionsTracker.debugLog("functions");
-          break;
-        }
-        case Constraint: {
-          LoadConstraint loadConstraint =
-              new LoadConstraint(loadContext, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
-          TaskTracker constraintTracker = loadConstraint.tasks();
-          scope.rootTasks.addAll(constraintTracker.tasks());
-          loadTaskTracker.update(constraintTracker);
-          constraintTracker.debugLog("constraints");
-        }
-        }
-
-        if (!loadingConstraint && !iterator.currentDbHasNext()) {
-          createEndReplLogTask(loadContext, scope, iterator.replLogger());
+          tableTracker.debugLog("table");
+          partitionsTracker.debugLog("partitions for table");
         }
+        break;
+      case Partition:
+      /*
+          This will happen only when loading tables and we reach the limit of number of tasks we can create;
+          hence we know here that the table should exist and there should be a lastPartitionName
+      */
+        addLoadPartitionTasks(loadContext, next, dbTracker, iterator, scope, loadTaskTracker, tableTracker);
+        break;
+      case Function:
+        loadTaskTracker.update(addLoadFunctionTasks(loadContext, iterator, next, dbTracker, scope));
+        break;
+      case Constraint:
+        loadTaskTracker.update(addLoadConstraintsTasks(loadContext, next, dbTracker, scope));
+        break;
+      default:
+        break;
       }
-
-      boolean addAnotherLoadTask = iterator.hasNext()
-          || loadTaskTracker.hasReplicationState()
-          || constraintIterator.hasNext();
-
-      if (addAnotherLoadTask) {
-        createBuilderTask(scope.rootTasks);
-      }
-
-      // Update last repl ID of the database only if the current dump is not incremental. If bootstrap
-      // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
-      // last repl ID of the database.
-      if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
-        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
-        work.updateDbEventState(null);
+      if (!loadingConstraint && !iterator.currentDbHasNext()) {
+        createEndReplLogTask(loadContext, scope, iterator.replLogger());
       }
-      this.childTasks = scope.rootTasks;
-      /*
-      Since there can be multiple rounds of this run all of which will be tied to the same
-      query id -- generated in compile phase , adding a additional UUID to the end to print each run
-      in separate files.
-       */
-      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+    }
+    boolean addAnotherLoadTask = iterator.hasNext()
+        || loadTaskTracker.hasReplicationState()
+        || constraintIterator.hasNext();
 
-      // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later
-      context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs());
-      createReplLoadCompleteAckTask();
-    }  catch (RuntimeException e) {
-      LOG.error("replication failed with run time exception", e);
-      throw e;
-    } catch (Exception e) {
-      LOG.error("replication failed", e);
-      setException(e);
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    if (addAnotherLoadTask) {
+      createBuilderTask(scope.rootTasks);
+    }
+    // Update last repl ID of the database only if the current dump is not incremental. If bootstrap
+    // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
+    // last repl ID of the database.
+    if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
+      loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
+      work.updateDbEventState(null);
     }
+    if (childTasks == null) {
+      childTasks = new ArrayList<>();
+    }
+    childTasks.addAll(scope.rootTasks);
+    /*
+    Since there can be multiple rounds of this run all of which will be tied to the same
+    query id -- generated in compile phase , adding a additional UUID to the end to print each run
+    in separate files.
+     */
+    LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+    // Populate the driver context with the scratch dir info from the repl context, so that the
+    // temp dirs will be cleaned up later
+    context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs());
+    createReplLoadCompleteAckTask();
     LOG.info("completed load task run : {}", work.executedLoadTask());
     return 0;
   }
 
+  private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker,
+                                            BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker,
+                                            TaskTracker tableTracker) throws Exception {
+    PartitionEvent event = (PartitionEvent) next;
+    TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
+    LoadPartitions loadPartitions =
+        new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
+        event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
+        /*
+             the tableTracker here should be a new instance and not an existing one as this can
+             only happen when we break in between loading partitions.
+         */
+    TaskTracker partitionsTracker = loadPartitions.tasks();
+    partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+        partitionsTracker);
+    partitionsTracker.debugLog("partitions");
+    return partitionsTracker;
+  }
+
+  private TaskTracker addLoadConstraintsTasks(Context loadContext,
+                                              BootstrapEvent next,
+                                              TaskTracker dbTracker,
+                                              Scope scope) throws IOException, SemanticException {
+    LoadConstraint loadConstraint =
+        new LoadConstraint(loadContext, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
+    TaskTracker constraintTracker = loadConstraint.tasks();
+    scope.rootTasks.addAll(constraintTracker.tasks());
+    constraintTracker.debugLog("constraints");
+    return constraintTracker;
+  }
+
+  private TaskTracker addLoadFunctionTasks(Context loadContext, BootstrapEventsIterator iterator, BootstrapEvent next,
+                                    TaskTracker dbTracker, Scope scope) throws IOException, SemanticException {
+    LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(),
+        (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
+    TaskTracker functionsTracker = loadFunction.tasks();
+    if (!scope.database) {
+      scope.rootTasks.addAll(functionsTracker.tasks());
+    } else {
+      setUpDependencies(dbTracker, functionsTracker);
+    }
+    functionsTracker.debugLog("functions");
+    return functionsTracker;
+  }
+
   public static Task<?> createViewTask(MetaData metaData, String dbNameToLoadIn, HiveConf conf)
       throws SemanticException {
     Table table = new Table(metaData.getTable());
@@ -304,7 +348,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     TableName tableName = HiveTableName.ofNullable(table.getTableName(), dbName);
     String dbDotView = tableName.getNotEmptyDbTable();
     CreateViewDesc desc = new CreateViewDesc(dbDotView, table.getAllCols(), null, table.getParameters(),
-        table.getPartColNames(), false, false, false, table.getSd().getInputFormat(), table.getSd().getOutputFormat(),
+        table.getPartColNames(), false, false, false, table.getSd().getInputFormat(),
+        table.getSd().getOutputFormat(),
         table.getSd().getSerdeInfo().getSerializationLib());
     String originalText = table.getViewOriginalText();
     String expandedText = table.getViewExpandedText();
@@ -326,6 +371,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   /**
    * If replication policy is changed between previous and current load, then the excluded tables in
    * the new replication policy will be dropped.
+   *
    * @throws HiveException Failed to get/drop the tables.
    */
   private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveException {
@@ -340,30 +386,29 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     // List all the tables that are excluded in the current repl scope.
     Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
         tableName -> {
-          assert(tableName != null);
+          assert (tableName != null);
           return !tableName.toLowerCase().startsWith(
-                  SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
-                  && !replScope.tableIncludedInReplScope(tableName);
+              SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
+              && !replScope.tableIncludedInReplScope(tableName);
         });
     for (String table : tableNames) {
       db.dropTable(dbName + "." + table, true);
     }
     LOG.info("Tables in the Database: {} that are excluded in the replication scope are dropped.",
-            dbName);
+        dbName);
   }
 
   private void createReplLoadCompleteAckTask() {
     if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
-            || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
       AckWork replLoadAckWork = new AckWork(
-              new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
+          new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
       Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
-      if (this.childTasks.isEmpty()) {
-        this.childTasks.add(loadAckWorkTask);
+      if (childTasks.isEmpty()) {
+        childTasks.add(loadAckWorkTask);
       } else {
-        DAGTraversal.traverse(this.childTasks,
-                new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
+        DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
       }
     }
   }
@@ -374,7 +419,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     if (work.isIncrementalLoad()) {
       dbProps = new HashMap<>();
       dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
-                  work.incrementalLoadTasksBuilder().eventTo().toString());
+          work.incrementalLoadTasksBuilder().eventTo().toString());
     } else {
       Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
       dbProps = dbInMetadata.getParameters();
@@ -384,15 +429,13 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     if (scope.rootTasks.isEmpty()) {
       scope.rootTasks.add(replLogTask);
     } else {
-      DAGTraversal.traverse(scope.rootTasks,
-          new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
+      DAGTraversal.traverse(scope.rootTasks, new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
     }
   }
 
   /**
    * There was a database update done before and we want to make sure we update the last repl
    * id on this database as we are now going to switch to processing a new database.
-   *
    * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
    * is a root level task then in the execution phase the root level tasks will get executed first,
    * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
@@ -416,8 +459,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   }
 
   private void partitionsPostProcessing(BootstrapEventsIterator iterator,
-      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
-      TaskTracker partitionsTracker) {
+                                        Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+                                        TaskTracker partitionsTracker) {
     setUpDependencies(tableTracker, partitionsTracker);
     if (!scope.database && !scope.table) {
       scope.rootTasks.addAll(partitionsTracker.tasks());
@@ -452,78 +495,65 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
   }
 
-  private int executeIncrementalLoad() {
-    try {
-
-      // If replication policy is changed between previous and current repl load, then drop the tables
-      // that are excluded in the new replication policy.
-      dropTablesExcludedInReplScope(work.currentReplScope);
-
-      IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
-
-      // If incremental events are already applied, then check and perform if need to bootstrap any tables.
-      if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
-        if (work.hasBootstrapLoadTasks()) {
-          LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
-                  + "mode after applying all events.");
-          return executeBootStrapLoad();
-        }
+  private int executeIncrementalLoad() throws Exception {
+    // If replication policy is changed between previous and current repl load, then drop the tables
+    // that are excluded in the new replication policy.
+    dropTablesExcludedInReplScope(work.currentReplScope);
+    IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
+    // If incremental events are already applied, then check and perform if need to bootstrap any tables.
+    if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
+      if (work.hasBootstrapLoadTasks()) {
+        LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
+            + "mode after applying all events.");
+        return executeBootStrapLoad();
       }
-
-      List<Task<?>> childTasks = new ArrayList<>();
-      int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-
-      TaskTracker tracker = new TaskTracker(maxTasks);
-      childTasks.add(builder.build(context, getHive(), LOG, tracker));
-
-      // If there are no more events to be applied, add a task to update the last.repl.id of the
-      // target database to the event id of the last event considered by the dump. Next
-      // incremental cycle won't consider the events in this dump again if it starts from this id.
-      if (!builder.hasMoreWork()) {
-        // The name of the database to be loaded into is either specified directly in REPL LOAD
-        // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
-        // metadata during table level replication.
-        String dbName = work.dbNameToLoadIn;
-        if (dbName == null || StringUtils.isBlank(dbName)) {
-          if (work.currentReplScope != null) {
-            String replScopeDbName = work.currentReplScope.getDbName();
-            if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
-              dbName = replScopeDbName;
-            }
+    }
+    List<Task<?>> childTasks = new ArrayList<>();
+    int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+    TaskTracker tracker = new TaskTracker(maxTasks);
+    childTasks.add(builder.build(context, getHive(), LOG, tracker));
+    // If there are no more events to be applied, add a task to update the last.repl.id of the
+    // target database to the event id of the last event considered by the dump. Next
+    // incremental cycle won't consider the events in this dump again if it starts from this id.
+    if (!builder.hasMoreWork()) {
+      // The name of the database to be loaded into is either specified directly in REPL LOAD
+      // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
+      // metadata during table level replication.
+      String dbName = work.dbNameToLoadIn;
+      if (dbName == null || StringUtils.isBlank(dbName)) {
+        if (work.currentReplScope != null) {
+          String replScopeDbName = work.currentReplScope.getDbName();
+          if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
+            dbName = replScopeDbName;
           }
         }
-
-        // If we are replicating to multiple databases at a time, it's not
-        // possible to know which all databases we are replicating into and hence we can not
-        // update repl id in all those databases.
-        if (StringUtils.isNotBlank(dbName)) {
-          String lastEventid = builder.eventTo().toString();
-          Map<String, String> mapProp = new HashMap<>();
-          mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
-
-          AlterDatabaseSetPropertiesDesc alterDbDesc =
-                  new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
-                          new ReplicationSpec(lastEventid, lastEventid));
-          Task<?> updateReplIdTask =
-                  TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
-
-          DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
-          work.setLastReplIDUpdated(true);
-          LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
-        }
       }
-
-      // Once all the incremental events are applied, enable bootstrap of tables if exist.
-      if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
-        DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
+      // If we are replicating to multiple databases at a time, it's not
+      // possible to know which all databases we are replicating into and hence we can not
+      // update repl id in all those databases.
+      if (StringUtils.isNotBlank(dbName)) {
+        String lastEventid = builder.eventTo().toString();
+        Map<String, String> mapProp = new HashMap<>();
+        mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+        AlterDatabaseSetPropertiesDesc alterDbDesc =
+            new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
+                new ReplicationSpec(lastEventid, lastEventid));
+        Task<?> updateReplIdTask =
+            TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
+        DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
+        work.setLastReplIDUpdated(true);
+        LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
       }
-      this.childTasks = childTasks;
-      createReplLoadCompleteAckTask();
-      return 0;
-    } catch (Exception e) {
-      LOG.error("failed replication", e);
-      setException(e);
-      return 1;
     }
+    // Once all the incremental events are applied, enable bootstrap of tables if exist.
+    if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
+      DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
+    }
+    if (this.childTasks == null) {
+      this.childTasks = new ArrayList<>();
+    }
+    this.childTasks.addAll(childTasks);
+    createReplLoadCompleteAckTask();
+    return 0;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index f072eff..26cd59b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -44,6 +44,7 @@ public class ReplLoadWork implements Serializable {
   final ReplScope currentReplScope;
   final String dumpDirectory;
   private boolean lastReplIDUpdated;
+  private String sourceDbName;
 
   private final ConstraintEventsIterator constraintsIterator;
   private int loadTaskRunCount = 0;
@@ -60,12 +61,13 @@ public class ReplLoadWork implements Serializable {
   final LineageState sessionStateLineageState;
 
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory,
-                      String dbNameToLoadIn, ReplScope currentReplScope,
+                      String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope,
                       LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException {
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
     this.currentReplScope = currentReplScope;
+    this.sourceDbName = sourceDbName;
 
     // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name.
     if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) {
@@ -152,4 +154,8 @@ public class ReplLoadWork implements Serializable {
   public void setLastReplIDUpdated(boolean lastReplIDUpdated) {
     this.lastReplIDUpdated = lastReplIDUpdated;
   }
+
+  public String getSourceDbName() {
+    return sourceDbName;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java
new file mode 100644
index 0000000..4e3fa61
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.List;
+
+/**
+ * NoOpRangerRestClient returns empty policies.
+ */
+public class NoOpRangerRestClient implements RangerRestClient {
+
+  @Override
+  public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
+                                                     String dbName, String rangerHiveServiceName) {
+    return new RangerExportPolicyList();
+  }
+
+  @Override
+  public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
+                                                     String baseUrl,
+                                                     String rangerHiveServiceName) throws Exception {
+    return null;
+  }
+
+  @Override
+  public List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies) {
+    return null;
+  }
+
+  @Override
+  public List<RangerPolicy> changeDataSet(List<RangerPolicy> rangerPolicies, String sourceDbName,
+                                          String targetDbName) {
+    return null;
+  }
+
+  @Override
+  public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath,
+                                       String fileName, HiveConf conf) throws Exception {
+    return null;
+  }
+
+  @Override
+  public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, HiveConf conf) throws SemanticException {
+    return null;
+  }
+
+  @Override
+  public boolean checkConnection(String url) throws Exception {
+    return true;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java
new file mode 100644
index 0000000..2f999af
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Date;
+
+/**
+ * RangerBaseModelObject class to contain common attributes of Ranger Base object.
+ */
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE,
+    fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class RangerBaseModelObject implements java.io.Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private Long id;
+  private String guid;
+  private Boolean isEnabled;
+  private String createdBy;
+  private String updatedBy;
+  private Date createTime;
+  private Date updateTime;
+  private Long version;
+
+  public RangerBaseModelObject() {
+    setIsEnabled(null);
+  }
+
+  public void updateFrom(RangerBaseModelObject other) {
+    setIsEnabled(other.getIsEnabled());
+  }
+
+  /**
+   * @return the id
+   */
+  public Long getId() {
+    return id;
+  }
+
+  /**
+   * @param id the id to set
+   */
+  public void setId(Long id) {
+    this.id = id;
+  }
+
+  /**
+   * @return the guid
+   */
+  public String getGuid() {
+    return guid;
+  }
+
+  /**
+   * @param guid the guid to set
+   */
+  public void setGuid(String guid) {
+    this.guid = guid;
+  }
+
+  /**
+   * @return the isEnabled
+   */
+  public Boolean getIsEnabled() {
+    return isEnabled;
+  }
+
+  /**
+   * @param isEnabled the isEnabled to set
+   */
+  public void setIsEnabled(Boolean isEnabled) {
+    this.isEnabled = isEnabled == null ? Boolean.TRUE : isEnabled;
+  }
+
+  /**
+   * @return the createdBy
+   */
+  public String getCreatedBy() {
+    return createdBy;
+  }
+
+  /**
+   * @param createdBy the createdBy to set
+   */
+  public void setCreatedBy(String createdBy) {
+    this.createdBy = createdBy;
+  }
+
+  /**
+   * @return the updatedBy
+   */
+  public String getUpdatedBy() {
+    return updatedBy;
+  }
+
+  /**
+   * @param updatedBy the updatedBy to set
+   */
+  public void setUpdatedBy(String updatedBy) {
+    this.updatedBy = updatedBy;
+  }
+
+  /**
+   * @return the createTime
+   */
+  public Date getCreateTime() {
+    return new Date(createTime.getTime());
+  }
+
+  /**
+   * @param createTime the createTime to set
+   */
+  public void setCreateTime(Date createTime) {
+    this.createTime = new Date(createTime.getTime());
+  }
+
+  /**
+   * @return the updateTime
+   */
+  public Date getUpdateTime() {
+    return new Date(updateTime.getTime());
+  }
+
+  /**
+   * @param updateTime the updateTime to set
+   */
+  public void setUpdateTime(Date updateTime) {
+    this.updateTime = new Date(updateTime.getTime());
+  }
+
+  /**
+   * @return the version
+   */
+  public Long getVersion() {
+    return version;
+  }
+
+  /**
+   * @param version the version to set
+   */
+  public void setVersion(Long version) {
+    this.version = version;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    toString(sb);
+    return sb.toString();
+  }
+
+  public StringBuilder toString(StringBuilder sb) {
+    sb.append("id={").append(id).append("} ");
+    sb.append("guid={").append(guid).append("} ");
+    sb.append("isEnabled={").append(isEnabled).append("} ");
+    sb.append("createdBy={").append(createdBy).append("} ");
+    sb.append("updatedBy={").append(updatedBy).append("} ");
+    sb.append("createTime={").append(createTime).append("} ");
+    sb.append("updateTime={").append(updateTime).append("} ");
+    sb.append("version={").append(version).append("} ");
+
+    return sb;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java
new file mode 100644
index 0000000..a395feb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * RangerExportPolicyList class to extends RangerPolicyList class.
+ */
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE,
+    fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class RangerExportPolicyList extends RangerPolicyList implements java.io.Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private Map<String, Object> metaDataInfo = new LinkedHashMap<String, Object>();
+
+  public Map<String, Object> getMetaDataInfo() {
+    return metaDataInfo;
+  }
+
+  public void setMetaDataInfo(Map<String, Object> metaDataInfo) {
+    this.metaDataInfo = metaDataInfo;
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java
new file mode 100644
index 0000000..733a898
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java
@@ -0,0 +1,1513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RangerPolicy class to contain Ranger Policy details.
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class RangerPolicy extends RangerBaseModelObject implements java.io.Serializable {
+  public static final int POLICY_TYPE_ACCESS = 0;
+  public static final int POLICY_TYPE_DATAMASK = 1;
+  public static final int POLICY_TYPE_ROWFILTER = 2;
+
+  public static final int[] POLICY_TYPES = new int[]{
+      POLICY_TYPE_ACCESS,
+      POLICY_TYPE_DATAMASK,
+      POLICY_TYPE_ROWFILTER,
+  };
+
+  public static final String MASK_TYPE_NULL = "MASK_NULL";
+  public static final String MASK_TYPE_NONE = "MASK_NONE";
+  public static final String MASK_TYPE_CUSTOM = "CUSTOM";
+
+  private static final long serialVersionUID = 1L;
+
+  private String service;
+  private String name;
+  private Integer policyType;
+  private String description;
+  private String resourceSignature;
+  private Boolean isAuditEnabled;
+  private Map<String, RangerPolicyResource> resources;
+  private List<RangerPolicyItem> policyItems;
+  private List<RangerPolicyItem> denyPolicyItems;
+  private List<RangerPolicyItem> allowExceptions;
+  private List<RangerPolicyItem> denyExceptions;
+  private List<RangerDataMaskPolicyItem> dataMaskPolicyItems;
+  private List<RangerRowFilterPolicyItem> rowFilterPolicyItems;
+
+
+  /**
+   * Ranger Policy default constructor.
+   */
+  public RangerPolicy() {
+    this(null, null, null, null, null, null, null);
+  }
+
+  /**
+   * @param service
+   * @param name
+   * @param policyType
+   * @param description
+   * @param resources
+   * @param policyItems
+   * @param resourceSignature TODO
+   */
+  public RangerPolicy(String service, String name, Integer policyType, String description,
+                      Map<String, RangerPolicyResource> resources, List<RangerPolicyItem> policyItems,
+                      String resourceSignature) {
+    super();
+    setService(service);
+    setName(name);
+    setPolicyType(policyType);
+    setDescription(description);
+    setResourceSignature(resourceSignature);
+    setIsAuditEnabled(null);
+    setResources(resources);
+    setPolicyItems(policyItems);
+    setDenyPolicyItems(null);
+    setAllowExceptions(null);
+    setDenyExceptions(null);
+    setDataMaskPolicyItems(null);
+    setRowFilterPolicyItems(null);
+  }
+
+  /**
+   * @param other
+   */
+  public void updateFrom(RangerPolicy other) {
+    super.updateFrom(other);
+
+    setService(other.getService());
+    setName(other.getName());
+    setPolicyType(other.getPolicyType());
+    setDescription(other.getDescription());
+    setResourceSignature(other.getResourceSignature());
+    setIsAuditEnabled(other.getIsAuditEnabled());
+    setResources(other.getResources());
+    setPolicyItems(other.getPolicyItems());
+    setDenyPolicyItems(other.getDenyPolicyItems());
+    setAllowExceptions(other.getAllowExceptions());
+    setDenyExceptions(other.getDenyExceptions());
+    setDataMaskPolicyItems(other.getDataMaskPolicyItems());
+    setRowFilterPolicyItems(other.getRowFilterPolicyItems());
+  }
+
+  /**
+   * @return the type
+   */
+  public String getService() {
+    return service;
+  }
+
+  /**
+   * @param service the type to set
+   */
+  public void setService(String service) {
+    this.service = service;
+  }
+
+  /**
+   * @return the name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * @param name the name to set
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * @return the policyType
+   */
+  public Integer getPolicyType() {
+    return policyType;
+  }
+
+  /**
+   * @param policyType the policyType to set
+   */
+  public void setPolicyType(Integer policyType) {
+    this.policyType = policyType;
+  }
+
+  /**
+   * @return the description
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * @param description the description to set
+   */
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  /**
+   * @return the resourceSignature
+   */
+  public String getResourceSignature() {
+    return resourceSignature;
+  }
+
+  /**
+   * @param resourceSignature the resourceSignature to set
+   */
+  public void setResourceSignature(String resourceSignature) {
+    this.resourceSignature = resourceSignature;
+  }
+
+  /**
+   * @return the isAuditEnabled
+   */
+  public Boolean getIsAuditEnabled() {
+    return isAuditEnabled;
+  }
+
+  /**
+   * @param isAuditEnabled the isEnabled to set
+   */
+  public void setIsAuditEnabled(Boolean isAuditEnabled) {
+    this.isAuditEnabled = isAuditEnabled == null ? Boolean.TRUE : isAuditEnabled;
+  }
+
+  /**
+   * @return the resources
+   */
+  public Map<String, RangerPolicyResource> getResources() {
+    return resources;
+  }
+
+  /**
+   * @param resources the resources to set
+   */
+  public void setResources(Map<String, RangerPolicyResource> resources) {
+    if (this.resources == null) {
+      this.resources = new HashMap<>();
+    }
+
+    if (this.resources == resources) {
+      return;
+    }
+
+    this.resources.clear();
+
+    if (resources != null) {
+      for (Map.Entry<String, RangerPolicyResource> e : resources.entrySet()) {
+        this.resources.put(e.getKey(), e.getValue());
+      }
+    }
+  }
+
+  /**
+   * @return the policyItems
+   */
+  public List<RangerPolicyItem> getPolicyItems() {
+    return policyItems;
+  }
+
+  /**
+   * @param policyItems the policyItems to set
+   */
+  public void setPolicyItems(List<RangerPolicyItem> policyItems) {
+    if (this.policyItems == null) {
+      this.policyItems = new ArrayList<>();
+    }
+
+    if (this.policyItems == policyItems) {
+      return;
+    }
+
+    this.policyItems.clear();
+
+    if (policyItems != null) {
+      this.policyItems.addAll(policyItems);
+    }
+  }
+
+  /**
+   * @return the denyPolicyItems
+   */
+  public List<RangerPolicyItem> getDenyPolicyItems() {
+    return denyPolicyItems;
+  }
+
+  /**
+   * @param denyPolicyItems the denyPolicyItems to set
+   */
+  public void setDenyPolicyItems(List<RangerPolicyItem> denyPolicyItems) {
+    if (this.denyPolicyItems == null) {
+      this.denyPolicyItems = new ArrayList<>();
+    }
+
+    if (this.denyPolicyItems == denyPolicyItems) {
+      return;
+    }
+
+    this.denyPolicyItems.clear();
+
+    if (denyPolicyItems != null) {
+      this.denyPolicyItems.addAll(denyPolicyItems);
+    }
+  }
+
+  /**
+   * @return the allowExceptions
+   */
+  public List<RangerPolicyItem> getAllowExceptions() {
+    return allowExceptions;
+  }
+
+  /**
+   * @param allowExceptions the allowExceptions to set
+   */
+  public void setAllowExceptions(List<RangerPolicyItem> allowExceptions) {
+    if (this.allowExceptions == null) {
+      this.allowExceptions = new ArrayList<>();
+    }
+
+    if (this.allowExceptions == allowExceptions) {
+      return;
+    }
+
+    this.allowExceptions.clear();
+
+    if (allowExceptions != null) {
+      this.allowExceptions.addAll(allowExceptions);
+    }
+  }
+
+  /**
+   * @return the denyExceptions
+   */
+  public List<RangerPolicyItem> getDenyExceptions() {
+    return denyExceptions;
+  }
+
+  /**
+   * @param denyExceptions the denyExceptions to set
+   */
+  public void setDenyExceptions(List<RangerPolicyItem> denyExceptions) {
+    if (this.denyExceptions == null) {
+      this.denyExceptions = new ArrayList<>();
+    }
+
+    if (this.denyExceptions == denyExceptions) {
+      return;
+    }
+
+    this.denyExceptions.clear();
+
+    if (denyExceptions != null) {
+      this.denyExceptions.addAll(denyExceptions);
+    }
+  }
+
+  public List<RangerDataMaskPolicyItem> getDataMaskPolicyItems() {
+    return dataMaskPolicyItems;
+  }
+
+  public void setDataMaskPolicyItems(List<RangerDataMaskPolicyItem> dataMaskPolicyItems) {
+    if (this.dataMaskPolicyItems == null) {
+      this.dataMaskPolicyItems = new ArrayList<>();
+    }
+
+    if (this.dataMaskPolicyItems == dataMaskPolicyItems) {
+      return;
+    }
+
+    this.dataMaskPolicyItems.clear();
+
+    if (dataMaskPolicyItems != null) {
+      this.dataMaskPolicyItems.addAll(dataMaskPolicyItems);
+    }
+  }
+
+  public List<RangerRowFilterPolicyItem> getRowFilterPolicyItems() {
+    return rowFilterPolicyItems;
+  }
+
+  public void setRowFilterPolicyItems(List<RangerRowFilterPolicyItem> rowFilterPolicyItems) {
+    if (this.rowFilterPolicyItems == null) {
+      this.rowFilterPolicyItems = new ArrayList<>();
+    }
+
+    if (this.rowFilterPolicyItems == rowFilterPolicyItems) {
+      return;
+    }
+
+    this.rowFilterPolicyItems.clear();
+
+    if (rowFilterPolicyItems != null) {
+      this.rowFilterPolicyItems.addAll(rowFilterPolicyItems);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    toString(sb);
+    return sb.toString();
+  }
+
+  public StringBuilder toString(StringBuilder sb) {
+    sb.append("RangerPolicy={");
+
+    super.toString(sb);
+
+    sb.append("service={").append(service).append("} ");
+    sb.append("name={").append(name).append("} ");
+    sb.append("policyType={").append(policyType).append("} ");
+    sb.append("description={").append(description).append("} ");
+    sb.append("resourceSignature={").append(resourceSignature).append("} ");
+    sb.append("isAuditEnabled={").append(isAuditEnabled).append("} ");
+
+    sb.append("resources={");
+    if (resources != null) {
+      for (Map.Entry<String, RangerPolicyResource> e : resources.entrySet()) {
+        sb.append(e.getKey()).append("={");
+        e.getValue().toString(sb);
+        sb.append("} ");
+      }
+    }
+    sb.append("} ");
+
+    sb.append("policyItems={");
+    if (policyItems != null) {
+      for (RangerPolicyItem policyItem : policyItems) {
+        if (policyItem != null) {
+          policyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("denyPolicyItems={");
+    if (denyPolicyItems != null) {
+      for (RangerPolicyItem policyItem : denyPolicyItems) {
+        if (policyItem != null) {
+          policyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("allowExceptions={");
+    if (allowExceptions != null) {
+      for (RangerPolicyItem policyItem : allowExceptions) {
+        if (policyItem != null) {
+          policyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("denyExceptions={");
+    if (denyExceptions != null) {
+      for (RangerPolicyItem policyItem : denyExceptions) {
+        if (policyItem != null) {
+          policyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("dataMaskPolicyItems={");
+    if (dataMaskPolicyItems != null) {
+      for (RangerDataMaskPolicyItem dataMaskPolicyItem : dataMaskPolicyItems) {
+        if (dataMaskPolicyItem != null) {
+          dataMaskPolicyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("rowFilterPolicyItems={");
+    if (rowFilterPolicyItems != null) {
+      for (RangerRowFilterPolicyItem rowFilterPolicyItem : rowFilterPolicyItems) {
+        if (rowFilterPolicyItem != null) {
+          rowFilterPolicyItem.toString(sb);
+        }
+      }
+    }
+    sb.append("} ");
+
+    sb.append("}");
+
+    return sb;
+  }
+
+  /**
+   * RangerPolicyResource class to store the resource path values.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyResource implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private List<String> values;
+    private Boolean isExcludes;
+    private Boolean isRecursive;
+
+    public RangerPolicyResource() {
+      this((List<String>) null, null, null);
+    }
+
+    public RangerPolicyResource(String value) {
+      setValue(value);
+      setIsExcludes(null);
+      setIsRecursive(null);
+    }
+
+    public RangerPolicyResource(String value, Boolean isExcludes, Boolean isRecursive) {
+      setValue(value);
+      setIsExcludes(isExcludes);
+      setIsRecursive(isRecursive);
+    }
+
+    public RangerPolicyResource(List<String> values, Boolean isExcludes, Boolean isRecursive) {
+      setValues(values);
+      setIsExcludes(isExcludes);
+      setIsRecursive(isRecursive);
+    }
+
+    /**
+     * @return the values
+     */
+    public List<String> getValues() {
+      return values;
+    }
+
+    /**
+     * @param values the values to set
+     */
+    public void setValues(List<String> values) {
+      if (this.values == null) {
+        this.values = new ArrayList<>();
+      }
+      if (this.values == values) {
+        return;
+      }
+      this.values.clear();
+      if (values != null) {
+        this.values.addAll(values);
+      }
+    }
+
+    /**
+     * @param value the value to set
+     */
+    public void setValue(String value) {
+      if (this.values == null) {
+        this.values = new ArrayList<>();
+      }
+      this.values.clear();
+      this.values.add(value);
+    }
+
+    /**
+     * @return the isExcludes
+     */
+    public Boolean getIsExcludes() {
+      return isExcludes;
+    }
+
+    /**
+     * @param isExcludes the isExcludes to set
+     */
+    public void setIsExcludes(Boolean isExcludes) {
+      this.isExcludes = isExcludes == null ? Boolean.FALSE : isExcludes;
+    }
+
+    /**
+     * @return the isRecursive
+     */
+    public Boolean getIsRecursive() {
+      return isRecursive;
+    }
+
+    /**
+     * @param isRecursive the isRecursive to set
+     */
+    public void setIsRecursive(Boolean isRecursive) {
+      this.isRecursive = isRecursive == null ? Boolean.FALSE : isRecursive;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyResource={");
+      sb.append("values={");
+      if (values != null) {
+        for (String value : values) {
+          sb.append(value).append(" ");
+        }
+      }
+      sb.append("} ");
+      sb.append("isExcludes={").append(isExcludes).append("} ");
+      sb.append("isRecursive={").append(isRecursive).append("} ");
+      sb.append("}");
+
+      return sb;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result
+          + ((isExcludes == null) ? 0 : isExcludes.hashCode());
+      result = prime * result
+          + ((isRecursive == null) ? 0 : isRecursive.hashCode());
+      result = prime * result
+          + ((values == null) ? 0 : values.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyResource other = (RangerPolicyResource) obj;
+      if (isExcludes == null) {
+        if (other.isExcludes != null) {
+          return false;
+        }
+      } else if (!isExcludes.equals(other.isExcludes)) {
+        return false;
+      }
+      if (isRecursive == null) {
+        if (other.isRecursive != null) {
+          return false;
+        }
+      } else if (!isRecursive.equals(other.isRecursive)) {
+        return false;
+      }
+      if (values == null) {
+        if (other.values != null) {
+          return false;
+        }
+      } else if (!values.equals(other.values)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * RangerPolicyItem class contains ranger policy items like access and permissions.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyItem implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private List<RangerPolicyItemAccess> accesses;
+    private List<String> users;
+    private List<String> groups;
+    private List<RangerPolicyItemCondition> conditions;
+    private Boolean delegateAdmin;
+
+    public RangerPolicyItem() {
+      this(null, null, null, null, null);
+    }
+
+    public RangerPolicyItem(List<RangerPolicyItemAccess> accessTypes, List<String> users, List<String> groups,
+                            List<RangerPolicyItemCondition> conditions, Boolean delegateAdmin) {
+      setAccesses(accessTypes);
+      setUsers(users);
+      setGroups(groups);
+      setConditions(conditions);
+      setDelegateAdmin(delegateAdmin);
+    }
+
+    /**
+     * @return the accesses
+     */
+    public List<RangerPolicyItemAccess> getAccesses() {
+      return accesses;
+    }
+
+    /**
+     * @param accesses the accesses to set
+     */
+    public void setAccesses(List<RangerPolicyItemAccess> accesses) {
+      if (this.accesses == null) {
+        this.accesses = new ArrayList<>();
+      }
+
+      if (this.accesses == accesses) {
+        return;
+      }
+
+      this.accesses.clear();
+
+      if (accesses != null) {
+        this.accesses.addAll(accesses);
+      }
+    }
+
+    /**
+     * @return the users
+     */
+    public List<String> getUsers() {
+      return users;
+    }
+
+    /**
+     * @param users the users to set
+     */
+    public void setUsers(List<String> users) {
+      if (this.users == null) {
+        this.users = new ArrayList<>();
+      }
+
+      if (this.users == users) {
+        return;
+      }
+
+      this.users.clear();
+
+      if (users != null) {
+        this.users.addAll(users);
+      }
+    }
+
+    /**
+     * @return the groups
+     */
+    public List<String> getGroups() {
+      return groups;
+    }
+
+    /**
+     * @param groups the groups to set
+     */
+    public void setGroups(List<String> groups) {
+      if (this.groups == null) {
+        this.groups = new ArrayList<>();
+      }
+      if (this.groups == groups) {
+        return;
+      }
+      this.groups.clear();
+      if (groups != null) {
+        this.groups.addAll(groups);
+      }
+    }
+
+    /**
+     * @return the conditions
+     */
+    public List<RangerPolicyItemCondition> getConditions() {
+      return conditions;
+    }
+
+    /**
+     * @param conditions the conditions to set
+     */
+    public void setConditions(List<RangerPolicyItemCondition> conditions) {
+      if (this.conditions == null) {
+        this.conditions = new ArrayList<>();
+      }
+      if (this.conditions == conditions) {
+        return;
+      }
+      this.conditions.clear();
+      if (conditions != null) {
+        this.conditions.addAll(conditions);
+      }
+    }
+
+    /**
+     * @return the delegateAdmin
+     */
+    public Boolean getDelegateAdmin() {
+      return delegateAdmin;
+    }
+
+    /**
+     * @param delegateAdmin the delegateAdmin to set
+     */
+    public void setDelegateAdmin(Boolean delegateAdmin) {
+      this.delegateAdmin = delegateAdmin == null ? Boolean.FALSE : delegateAdmin;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyItem={");
+      sb.append("accessTypes={");
+      if (accesses != null) {
+        for (RangerPolicyItemAccess access : accesses) {
+          if (access != null) {
+            access.toString(sb);
+          }
+        }
+      }
+      sb.append("} ");
+      sb.append("users={");
+      if (users != null) {
+        for (String user : users) {
+          if (user != null) {
+            sb.append(user).append(" ");
+          }
+        }
+      }
+      sb.append("} ");
+      sb.append("groups={");
+      if (groups != null) {
+        for (String group : groups) {
+          if (group != null) {
+            sb.append(group).append(" ");
+          }
+        }
+      }
+      sb.append("} ");
+      sb.append("conditions={");
+      if (conditions != null) {
+        for (RangerPolicyItemCondition condition : conditions) {
+          if (condition != null) {
+            condition.toString(sb);
+          }
+        }
+      }
+      sb.append("} ");
+      sb.append("delegateAdmin={").append(delegateAdmin).append("} ");
+      sb.append("}");
+
+      return sb;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result
+          + ((accesses == null) ? 0 : accesses.hashCode());
+      result = prime * result
+          + ((conditions == null) ? 0 : conditions.hashCode());
+      result = prime * result
+          + ((delegateAdmin == null) ? 0 : delegateAdmin.hashCode());
+      result = prime * result
+          + ((groups == null) ? 0 : groups.hashCode());
+      result = prime * result + ((users == null) ? 0 : users.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyItem other = (RangerPolicyItem) obj;
+      if (accesses == null) {
+        if (other.accesses != null) {
+          return false;
+        }
+      } else if (!accesses.equals(other.accesses)) {
+        return false;
+      }
+      if (conditions == null) {
+        if (other.conditions != null) {
+          return false;
+        }
+      } else if (!conditions.equals(other.conditions)) {
+        return false;
+      }
+      if (delegateAdmin == null) {
+        if (other.delegateAdmin != null) {
+          return false;
+        }
+      } else if (!delegateAdmin.equals(other.delegateAdmin)) {
+        return false;
+      }
+      if (groups == null) {
+        if (other.groups != null) {
+          return false;
+        }
+      } else if (!groups.equals(other.groups)) {
+        return false;
+      }
+      if (users == null) {
+        if (other.users != null) {
+          return false;
+        }
+      } else if (!users.equals(other.users)) {
+        return false;
+      }
+      return true;
+
+    }
+  }
+
+  /**
+   * RangerDataMaskPolicyItem class.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerDataMaskPolicyItem extends RangerPolicyItem implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+    private RangerPolicyItemDataMaskInfo dataMaskInfo;
+
+    public RangerDataMaskPolicyItem() {
+      this(null, null, null, null, null, null);
+    }
+
+    public RangerDataMaskPolicyItem(List<RangerPolicyItemAccess> accesses,
+                                    RangerPolicyItemDataMaskInfo dataMaskDetail, List<String> users,
+                                    List<String> groups,
+                                    List<RangerPolicyItemCondition> conditions, Boolean delegateAdmin) {
+      super(accesses, users, groups, conditions, delegateAdmin);
+      setDataMaskInfo(dataMaskDetail);
+    }
+
+    /**
+     * @return the dataMaskInfo
+     */
+    public RangerPolicyItemDataMaskInfo getDataMaskInfo() {
+      return dataMaskInfo;
+    }
+
+    /**
+     * @param dataMaskInfo the dataMaskInfo to set
+     */
+    public void setDataMaskInfo(RangerPolicyItemDataMaskInfo dataMaskInfo) {
+      this.dataMaskInfo = dataMaskInfo == null ? new RangerPolicyItemDataMaskInfo() : dataMaskInfo;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((dataMaskInfo == null) ? 0 : dataMaskInfo.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerDataMaskPolicyItem other = (RangerDataMaskPolicyItem) obj;
+      if (dataMaskInfo == null) {
+        if (other.dataMaskInfo != null) {
+          return false;
+        }
+      } else if (!dataMaskInfo.equals(other.dataMaskInfo)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerDataMaskPolicyItem={");
+      super.toString(sb);
+      sb.append("dataMaskInfo={");
+      if (dataMaskInfo != null) {
+        dataMaskInfo.toString(sb);
+      }
+      sb.append("} ");
+      sb.append("}");
+      return sb;
+    }
+  }
+
+  /**
+   * RangerRowFilterPolicyItem class.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerRowFilterPolicyItem extends RangerPolicyItem implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+    private RangerPolicyItemRowFilterInfo rowFilterInfo;
+
+    public RangerRowFilterPolicyItem() {
+      this(null, null, null, null, null, null);
+    }
+
+    public RangerRowFilterPolicyItem(RangerPolicyItemRowFilterInfo rowFilterInfo,
+                                     List<RangerPolicyItemAccess> accesses, List<String> users, List<String> groups,
+                                     List<RangerPolicyItemCondition> conditions, Boolean delegateAdmin) {
+      super(accesses, users, groups, conditions, delegateAdmin);
+      setRowFilterInfo(rowFilterInfo);
+    }
+
+    /**
+     * @return the rowFilterInfo
+     */
+    public RangerPolicyItemRowFilterInfo getRowFilterInfo() {
+      return rowFilterInfo;
+    }
+
+    /**
+     * @param rowFilterInfo the rowFilterInfo to set
+     */
+    public void setRowFilterInfo(RangerPolicyItemRowFilterInfo rowFilterInfo) {
+      this.rowFilterInfo = rowFilterInfo == null ? new RangerPolicyItemRowFilterInfo() : rowFilterInfo;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((rowFilterInfo == null) ? 0 : rowFilterInfo.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerRowFilterPolicyItem other = (RangerRowFilterPolicyItem) obj;
+      if (rowFilterInfo == null) {
+        if (other.rowFilterInfo != null) {
+          return false;
+        }
+      } else if (!rowFilterInfo.equals(other.rowFilterInfo)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerRowFilterPolicyItem={");
+      super.toString(sb);
+      sb.append("rowFilterInfo={");
+      if (rowFilterInfo != null) {
+        rowFilterInfo.toString(sb);
+      }
+      sb.append("} ");
+      sb.append("}");
+      return sb;
+    }
+  }
+
+  /**
+   * RangerPolicyItemAccess class.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyItemAccess implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String type;
+    private Boolean isAllowed;
+
+    public RangerPolicyItemAccess() {
+      this(null, null);
+    }
+
+    public RangerPolicyItemAccess(String type) {
+      this(type, null);
+    }
+
+    public RangerPolicyItemAccess(String type, Boolean isAllowed) {
+      setType(type);
+      setIsAllowed(isAllowed);
+    }
+
+    /**
+     * @return the type
+     */
+    public String getType() {
+      return type;
+    }
+
+    /**
+     * @param type the type to set
+     */
+    public void setType(String type) {
+      this.type = type;
+    }
+
+    /**
+     * @return the isAllowed
+     */
+    public Boolean getIsAllowed() {
+      return isAllowed;
+    }
+
+    /**
+     * @param isAllowed the isAllowed to set
+     */
+    public void setIsAllowed(Boolean isAllowed) {
+      this.isAllowed = isAllowed == null ? Boolean.TRUE : isAllowed;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyItemAccess={");
+      sb.append("type={").append(type).append("} ");
+      sb.append("isAllowed={").append(isAllowed).append("} ");
+      sb.append("}");
+      return sb;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((isAllowed == null) ? 0 : isAllowed.hashCode());
+      result = prime * result + ((type == null) ? 0 : type.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyItemAccess other = (RangerPolicyItemAccess) obj;
+      if (isAllowed == null) {
+        if (other.isAllowed != null) {
+          return false;
+        }
+      } else if (!isAllowed.equals(other.isAllowed)) {
+        return false;
+      }
+      if (type == null) {
+        if (other.type != null) {
+          return false;
+        }
+      } else if (!type.equals(other.type)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * RangerPolicyItemCondition class to store policy conditions.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyItemCondition implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String type;
+    private List<String> values;
+
+    public RangerPolicyItemCondition() {
+      this(null, null);
+    }
+
+    public RangerPolicyItemCondition(String type, List<String> values) {
+      setType(type);
+      setValues(values);
+    }
+
+    /**
+     * @return the type
+     */
+    public String getType() {
+      return type;
+    }
+
+    /**
+     * @param type the type to set
+     */
+    public void setType(String type) {
+      this.type = type;
+    }
+
+    /**
+     * @return the value
+     */
+    public List<String> getValues() {
+      return values;
+    }
+
+    /**
+     * @param values the value to set
+     */
+    public void setValues(List<String> values) {
+      if (this.values == null) {
+        this.values = new ArrayList<>();
+      }
+      if (this.values == values) {
+        return;
+      }
+      this.values.clear();
+      if (values != null) {
+        this.values.addAll(values);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyItemCondition={");
+      sb.append("type={").append(type).append("} ");
+      sb.append("values={");
+      if (values != null) {
+        for (String value : values) {
+          sb.append(value).append(" ");
+        }
+      }
+      sb.append("} ");
+      sb.append("}");
+
+      return sb;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((type == null) ? 0 : type.hashCode());
+      result = prime * result + ((values == null) ? 0 : values.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyItemCondition other = (RangerPolicyItemCondition) obj;
+      if (type == null) {
+        if (other.type != null) {
+          return false;
+        }
+      } else if (!type.equals(other.type)) {
+        return false;
+      }
+      if (values == null) {
+        if (other.values != null) {
+          return false;
+        }
+      } else if (!values.equals(other.values)) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * RangerPolicyItemDataMaskInfo store policy having datamasking.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyItemDataMaskInfo implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String dataMaskType;
+    private String conditionExpr;
+    private String valueExpr;
+
+    public RangerPolicyItemDataMaskInfo() {
+    }
+
+    public RangerPolicyItemDataMaskInfo(String dataMaskType, String conditionExpr, String valueExpr) {
+      setDataMaskType(dataMaskType);
+      setConditionExpr(conditionExpr);
+      setValueExpr(valueExpr);
+    }
+
+    public String getDataMaskType() {
+      return dataMaskType;
+    }
+
+    public void setDataMaskType(String dataMaskType) {
+      this.dataMaskType = dataMaskType;
+    }
+
+    public String getConditionExpr() {
+      return conditionExpr;
+    }
+
+    public void setConditionExpr(String conditionExpr) {
+      this.conditionExpr = conditionExpr;
+    }
+
+    public String getValueExpr() {
+      return valueExpr;
+    }
+
+    public void setValueExpr(String valueExpr) {
+      this.valueExpr = valueExpr;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((dataMaskType == null) ? 0 : dataMaskType.hashCode());
+      result = prime * result + ((conditionExpr == null) ? 0 : conditionExpr.hashCode());
+      result = prime * result + ((valueExpr == null) ? 0 : valueExpr.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyItemDataMaskInfo other = (RangerPolicyItemDataMaskInfo) obj;
+      if (dataMaskType == null) {
+        if (other.dataMaskType != null) {
+          return false;
+        }
+      } else if (!dataMaskType.equals(other.dataMaskType)) {
+        return false;
+      }
+      if (conditionExpr == null) {
+        if (other.conditionExpr != null) {
+          return false;
+        }
+      } else if (!conditionExpr.equals(other.conditionExpr)) {
+        return false;
+      }
+      if (valueExpr == null) {
+        if (other.valueExpr != null) {
+          return false;
+        }
+      } else if (!valueExpr.equals(other.valueExpr)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    private StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyItemDataMaskInfo={");
+      sb.append("dataMaskType={").append(dataMaskType).append("} ");
+      sb.append("conditionExpr={").append(conditionExpr).append("} ");
+      sb.append("valueExpr={").append(valueExpr).append("} ");
+      sb.append("}");
+      return sb;
+    }
+  }
+
+  /**
+   * Ranger policyItem Row-filter info class.
+   */
+  @JsonAutoDetect(fieldVisibility = Visibility.ANY)
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  @XmlRootElement
+  @XmlAccessorType(XmlAccessType.FIELD)
+  public static class RangerPolicyItemRowFilterInfo implements java.io.Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String filterExpr;
+
+    public RangerPolicyItemRowFilterInfo() {
+    }
+
+    public RangerPolicyItemRowFilterInfo(String filterExpr) {
+      setFilterExpr(filterExpr);
+    }
+
+    public String getFilterExpr() {
+      return filterExpr;
+    }
+
+    public void setFilterExpr(String filterExpr) {
+      this.filterExpr = filterExpr;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result + ((filterExpr == null) ? 0 : filterExpr.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!super.equals(obj)) {
+        return false;
+      }
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      RangerPolicyItemRowFilterInfo other = (RangerPolicyItemRowFilterInfo) obj;
+      if (filterExpr == null) {
+        if (other.filterExpr != null) {
+          return false;
+        }
+      } else if (!filterExpr.equals(other.filterExpr)) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+      sb.append("RangerPolicyItemDataMaskInfo={");
+      sb.append("filterExpr={").append(filterExpr).append("} ");
+      sb.append("}");
+      return sb;
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java
new file mode 100644
index 0000000..3de935a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * RangerPolicyList class to contain List of RangerPolicy objects.
+ */
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE,
+    fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class RangerPolicyList {
+  private static final long serialVersionUID = 1L;
+
+  private List<RangerPolicy> policies = new ArrayList<RangerPolicy>();
+
+  public RangerPolicyList() {
+    super();
+  }
+
+  public RangerPolicyList(List<RangerPolicy> objList) {
+    this.policies = objList;
+  }
+
+  public List<RangerPolicy> getPolicies() {
+    return policies;
+  }
+
+  public void setPolicies(List<RangerPolicy> policies) {
+    this.policies = policies;
+  }
+
+  public int getListSize() {
+    if (policies != null) {
+      return policies.size();
+    }
+    return 0;
+  }
+
+
+  public List<?> getList() {
+    return policies;
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java
new file mode 100644
index 0000000..eab20f4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.List;
+
+/**
+ * RangerRestClient to connect to Ranger service and export policies.
+ */
+public interface RangerRestClient {
+  RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
+                                              String dbName, String rangerHiveServiceName) throws Exception;
+
+  RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
+                                              String baseUrl,
+                                              String rangerHiveServiceName) throws Exception;
+
+  List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies);
+
+  List<RangerPolicy> changeDataSet(List<RangerPolicy> rangerPolicies, String sourceDbName,
+                                   String targetDbName);
+
+  Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath,
+                                String fileName, HiveConf conf) throws Exception;
+
+  RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath,
+                                                        HiveConf conf) throws SemanticException;
+
+  boolean checkConnection(String url) throws Exception;
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
new file mode 100644
index 0000000..c535f9e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.ranger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.multipart.FormDataMultiPart;
+import com.sun.jersey.multipart.MultiPart;
+import com.sun.jersey.multipart.file.StreamDataBodyPart;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.eclipse.jetty.util.MultiPartWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+
+/**
+ * RangerRestClientImpl to connect to Ranger and export policies.
+ */
+public class RangerRestClientImpl implements RangerRestClient {
+  private static final Logger LOG = LoggerFactory.getLogger(RangerRestClientImpl.class);
+  private static final String RANGER_REST_URL_EXPORTJSONFILE = "/service/plugins/policies/exportJson";
+  private static final String RANGER_REST_URL_IMPORTJSONFILE =
+      "/service/plugins/policies/importPoliciesFromFile?updateIfExists=true";
+
+  public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
+                                                     String dbName,
+                                                     String rangerHiveServiceName)throws SemanticException {
+    LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint);
+    ClientResponse clientResp;
+    String uri;
+    if (StringUtils.isEmpty(rangerHiveServiceName)) {
+      throw new SemanticException("Ranger Service Name cannot be empty");
+    }
+    uri = RANGER_REST_URL_EXPORTJSONFILE + "?serviceName=" + rangerHiveServiceName + "&polResource="
+      + dbName + "&resource:database=" + dbName
+      + "&serviceType=hive&resourceMatchScope=self_or_ancestor&resourceMatch=full";
+    if (sourceRangerEndpoint.endsWith("/")) {
+      sourceRangerEndpoint = StringUtils.removePattern(sourceRangerEndpoint, "/+$");
+    }
+    String url = sourceRangerEndpoint + (uri.startsWith("/") ? uri : ("/" + uri));
+    LOG.debug("Url to export policies from source Ranger: {}", url);
+    RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList();
+    WebResource.Builder builder = getRangerResourceBuilder(url);
+    clientResp = builder.get(ClientResponse.class);
+
+    String response = null;
+    if (clientResp != null) {
+      if (clientResp.getStatus() == HttpServletResponse.SC_OK) {
+        Gson gson = new GsonBuilder().create();
+        response = clientResp.getEntity(String.class);
+        LOG.debug("Response received for ranger export {} ", response);
+        if (StringUtils.isNotEmpty(response)) {
+          rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class);
+          return rangerExportPolicyList;
+        }
+      } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+        LOG.debug("Ranger policy export request returned empty list");
+        return rangerExportPolicyList;
+      } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+        throw new SemanticException("Authentication Failure while communicating to Ranger admin");
+      } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) {
+        throw new SemanticException("Authorization Failure while communicating to Ranger admin");
+      }
+    }
+    if (StringUtils.isEmpty(response)) {
+      LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
+    }
+    return rangerExportPolicyList;
+  }
+
+  public List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies) {
+    List<RangerPolicy> rangerPoliciesToImport = new ArrayList<RangerPolicy>();
+    if (CollectionUtils.isNotEmpty(rangerPolicies)) {
+      Map<String, RangerPolicy.RangerPolicyResource> rangerPolicyResourceMap = null;
+      RangerPolicy.RangerPolicyResource rangerPolicyResource = null;
+      List<String> resourceNameList = null;
+      for (RangerPolicy rangerPolicy : rangerPolicies) {
+        if (rangerPolicy != null) {
+          rangerPolicyResourceMap = rangerPolicy.getResources();
+          if (rangerPolicyResourceMap != null) {
+            rangerPolicyResource = rangerPolicyResourceMap.get("database");
+            if (rangerPolicyResource != null) {
+              resourceNameList = rangerPolicyResource.getValues();
+              if (CollectionUtils.isNotEmpty(resourceNameList) && resourceNameList.size() == 1) {
+                rangerPoliciesToImport.add(rangerPolicy);
+              }
+            }
+          }
+        }
+      }
+    }
+    return rangerPoliciesToImport;
+  }
+
+  @Override
+  public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
+                                                     String baseUrl,
+                                                     String rangerHiveServiceName)
+      throws Exception {
+    String sourceClusterServiceName = null;
+    String serviceMapJsonFileName = "hive_servicemap.json";
+    String rangerPoliciesJsonFileName = "hive_replicationPolicies.json";
+    String uri = RANGER_REST_URL_IMPORTJSONFILE + "&polResource=" + dbName;
+
+    if (!rangerExportPolicyList.getPolicies().isEmpty()) {
+      sourceClusterServiceName = rangerExportPolicyList.getPolicies().get(0).getService();
+    }
+
+    if (StringUtils.isEmpty(sourceClusterServiceName)) {
+      sourceClusterServiceName = rangerHiveServiceName;
+    }
+
+    Map<String, String> serviceMap = new LinkedHashMap<String, String>();
+    if (!StringUtils.isEmpty(sourceClusterServiceName) && !StringUtils.isEmpty(rangerHiveServiceName)) {
+      serviceMap.put(sourceClusterServiceName, rangerHiveServiceName);
+    }
+
+    Gson gson = new GsonBuilder().create();
+    String jsonServiceMap = gson.toJson(serviceMap);
+
+    String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList);
+
+    String url = baseUrl
+        + (uri.startsWith("/") ? uri : ("/" + uri));
+
+    LOG.debug("URL to import policies on target Ranger: {}", url);
+    ClientResponse clientResp = null;
+
+    StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file",
+        new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)),
+        rangerPoliciesJsonFileName);
+    StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson",
+        new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName);
+
+    FormDataMultiPart formDataMultiPart = new FormDataMultiPart();
+    MultiPart multipartEntity = null;
+    try {
+      multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap);
+      WebResource.Builder builder = getRangerResourceBuilder(url);
+      clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA)
+          .post(ClientResponse.class, multipartEntity);
+      if (clientResp != null) {
+        if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
+          LOG.debug("Ranger policy import finished successfully");
+
+        } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
+          throw new Exception("Authentication Failure while communicating to Ranger admin");
+        } else {
+          throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs.");
+        }
+      }
+    } finally {
+      try {
+        if (filePartPolicies != null) {
+          filePartPolicies.cleanup();
+        }
+        if (filePartServiceMap != null) {
+          filePartServiceMap.cleanup();
+        }
+        if (formDataMultiPart != null) {
+          formDataMultiPart.close();
+        }
+        if (multipartEntity != null) {
+          multipartEntity.close();
+        }
+      } catch (IOException e) {
+        LOG.error("Exception occurred while closing resources: {}", e);
+      }
+    }
+    return rangerExportPolicyList;
+  }
+
+  private synchronized Client getRangerClient() {
+    Client ret = null;
+    ClientConfig config = new DefaultClientConfig();
+    config.getClasses().add(MultiPartWriter.class);
+    config.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true);
+    ret = Client.create(config);
+    return ret;
+  }
+
+  @Override
+  public List<RangerPolicy> changeDataSet(List<RangerPolicy> rangerPolicies, String sourceDbName,
+                                          String targetDbName) {
+    if (sourceDbName.endsWith("/")) {
+      sourceDbName = StringUtils.removePattern(sourceDbName, "/+$");
+    }
+    if (targetDbName.endsWith("/")) {
+      targetDbName = StringUtils.removePattern(targetDbName, "/+$");
+    }
+    if (targetDbName.equals(sourceDbName)) {
+      return rangerPolicies;
+    }
+    if (CollectionUtils.isNotEmpty(rangerPolicies)) {
+      Map<String, RangerPolicy.RangerPolicyResource> rangerPolicyResourceMap = null;
+      RangerPolicy.RangerPolicyResource rangerPolicyResource = null;
+      List<String> resourceNameList = null;
+      for (RangerPolicy rangerPolicy : rangerPolicies) {
+        if (rangerPolicy != null) {
+          rangerPolicyResourceMap = rangerPolicy.getResources();
+          if (rangerPolicyResourceMap != null) {
+            rangerPolicyResource = rangerPolicyResourceMap.get("database");
+            if (rangerPolicyResource != null) {
+              resourceNameList = rangerPolicyResource.getValues();
+              if (CollectionUtils.isNotEmpty(resourceNameList)) {
+                for (int i = 0; i < resourceNameList.size(); i++) {
+                  String resourceName = resourceNameList.get(i);
+                  if (resourceName.equals(sourceDbName)) {
+                    resourceNameList.set(i, targetDbName);
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    return rangerPolicies;
+  }
+
+  private Path writeExportedRangerPoliciesToJsonFile(String jsonString, String fileName, Path stagingDirPath,
+                                                     HiveConf conf)
+      throws IOException {
+    String filePath = "";
+    Path newPath = null;
+    FSDataOutputStream outStream = null;
+    OutputStreamWriter writer = null;
+    try {
+      if (!StringUtils.isEmpty(jsonString)) {
+        FileSystem fileSystem = stagingDirPath.getFileSystem(conf);
+        if (fileSystem != null) {
+          if (!fileSystem.exists(stagingDirPath)) {
+            fileSystem.mkdirs(stagingDirPath);
+          }
+          newPath = stagingDirPath.suffix(File.separator + fileName);
+          outStream = fileSystem.create(newPath, true);
+          writer = new OutputStreamWriter(outStream, "UTF-8");
+          writer.write(jsonString);
+        }
+      }
+    } catch (IOException ex) {
+      if (newPath != null) {
+        filePath = newPath.toString();
+      }
+      throw new IOException("Failed to write json string to file:" + filePath, ex);
+    } catch (Exception ex) {
+      if (newPath != null) {
+        filePath = newPath.toString();
+      }
+      throw new IOException("Failed to write json string to file:" + filePath, ex);
+    } finally {
+      try {
+        if (writer != null) {
+          writer.close();
+        }
+        if (outStream != null) {
+          outStream.close();
+        }
+      } catch (Exception ex) {
+        throw new IOException("Unable to close writer/outStream.", ex);
+      }
+    }
+    return newPath;
+  }
+
+  @Override
+  public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath,
+                                       String fileName, HiveConf conf) throws Exception {
+    Gson gson = new GsonBuilder().create();
+    String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList);
+    Retry<Path> retriable = new Retry<Path>(IOException.class) {
+      @Override
+      public Path execute() throws IOException {
+        return writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName,
+            stagingDirPath, conf);
+      }
+    };
+    try {
+      return retriable.run();
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  @Override
+  public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath,
+                                                               HiveConf conf) throws SemanticException {
+    RangerExportPolicyList rangerExportPolicyList = null;
+    Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+    try {
+      FileSystem fs = filePath.getFileSystem(conf);
+      InputStream inputStream = fs.open(filePath);
+      Reader reader = new InputStreamReader(inputStream, Charset.forName("UTF-8"));
+      rangerExportPolicyList = gsonBuilder.fromJson(reader, RangerExportPolicyList.class);
+    } catch (Exception ex) {
+      throw new SemanticException("Error reading file :" + filePath, ex);
+    }
+    return rangerExportPolicyList;
+  }
+
+  @Override
+  public boolean checkConnection(String url) {
+    WebResource.Builder builder;
+    builder = getRangerResourceBuilder(url);
+    ClientResponse clientResp = builder.get(ClientResponse.class);
+    return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED);
+  }
+
+
+  private WebResource.Builder getRangerResourceBuilder(String url) {
+    Client client = getRangerClient();
+    WebResource webResource = client.resource(url);
+    WebResource.Builder builder = webResource.getRequestBuilder();
+    return builder;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 4fcee0e..377f742 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -79,6 +79,9 @@ public class ReplUtils {
   // Root base directory name for hive.
   public static final String REPL_HIVE_BASE_DIR = "hive";
 
+  // Root base directory name for ranger.
+  public static final String REPL_RANGER_BASE_DIR = "ranger";
+
   // Name of the directory which stores the list of tables included in the policy in case of table level replication.
   // One file per database, named after the db name. The directory is not created for db level replication.
   public static final String REPL_TABLE_LIST_DIR_NAME = "_tables";
@@ -100,6 +103,10 @@ public class ReplUtils {
 
   // Reserved number of items to accommodate operational files in the dump root dir.
   public static final int RESERVED_DIR_ITEMS_COUNT = 10;
+
+  public static final String RANGER_AUTHORIZER = "ranger";
+
+  public static final String HIVE_RANGER_POLICIES_FILE_NAME = "ranger_policies.json";
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index c4ff070..7959df2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -395,7 +395,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         } else {
           LOG.debug("{} contains an bootstrap dump", loadPath);
         }
-        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern,
+                replScope.getDbName(),
                 dmd.getReplScope(),
                 queryState.getLineageState(), evDump, dmd.getEventTo());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java
new file mode 100644
index 0000000..89cec53
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME;
+
+/**
+ * Unit test class for testing Ranger Dump.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestRangerDumpTask {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestRangerDumpTask.class);
+  private RangerDumpTask task;
+
+  @Mock
+  private RangerRestClientImpl mockClient;
+
+  @Mock
+  private HiveConf conf;
+
+  @Mock
+  private RangerDumpWork work;
+
+  @Before
+  public void setup() throws Exception {
+    task = new RangerDumpTask(mockClient, conf, work);
+    Mockito.when(mockClient.removeMultiResourcePolicies(Mockito.anyList())).thenCallRealMethod();
+    Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true);
+  }
+
+  @Test
+  public void testFailureInvalidAuthProviderEndpoint() throws Exception {
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn(null);
+    int status = task.execute();
+    Assert.assertEquals(40000, status);
+  }
+
+  @Test
+  public void testSuccessValidAuthProviderEndpoint() throws Exception {
+    RangerExportPolicyList rangerPolicyList = new RangerExportPolicyList();
+    rangerPolicyList.setPolicies(new ArrayList<RangerPolicy>());
+    Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+      .thenReturn(rangerPolicyList);
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint");
+    Mockito.when(conf.getVar(REPL_RANGER_SERVICE_NAME)).thenReturn("cm_hive");
+    Mockito.when(work.getDbName()).thenReturn("testdb");
+    int status = task.execute();
+    Assert.assertEquals(0, status);
+  }
+
+  @Test
+  public void testSuccessNonEmptyRangerPolicies() throws Exception {
+    String rangerResponse = "{\"metaDataInfo\":{\"Host name\":\"ranger.apache.org\","
+        + "\"Exported by\":\"hive\",\"Export time\":\"May 5, 2020, 8:55:03 AM\",\"Ranger apache version\""
+        + ":\"2.0.0.7.2.0.0-61\"},\"policies\":[{\"service\":\"cm_hive\",\"name\":\"db-level\",\"policyType\":0,"
+        + "\"description\":\"\",\"isAuditEnabled\":true,\"resources\":{\"database\":{\"values\":[\"aa\"],"
+        + "\"isExcludes\":false,\"isRecursive\":false},\"column\":{\"values\":[\"id\"],\"isExcludes\":false,"
+        + "\"isRecursive\":false},\"table\":{\"values\":[\"*\"],\"isExcludes\":false,\"isRecursive\":false}},"
+        + "\"policyItems\":[{\"accesses\":[{\"type\":\"select\",\"isAllowed\":true},{\"type\":\"update\","
+        + "\"isAllowed\":true}],\"users\":[\"admin\"],\"groups\":[\"public\"],\"conditions\":[],"
+        + "\"delegateAdmin\":false}],\"denyPolicyItems\":[],\"allowExceptions\":[],\"denyExceptions\":[],"
+        + "\"dataMaskPolicyItems\":[],\"rowFilterPolicyItems\":[],\"id\":40,\"guid\":"
+        + "\"4e2b3406-7b9a-4004-8cdf-7a239c8e2cae\",\"isEnabled\":true,\"version\":1}]}";
+    RangerExportPolicyList rangerPolicyList = new Gson().fromJson(rangerResponse, RangerExportPolicyList.class);
+    Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+      .thenReturn(rangerPolicyList);
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint");
+    Mockito.when(conf.getVar(REPL_RANGER_SERVICE_NAME)).thenReturn("cm_hive");
+    Mockito.when(work.getDbName()).thenReturn("testdb");
+    Path rangerDumpPath = new Path("/tmp");
+    Mockito.when(work.getCurrentDumpPath()).thenReturn(rangerDumpPath);
+    Path policyFile = new Path(rangerDumpPath, ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME);
+    Mockito.when(mockClient.saveRangerPoliciesToFile(rangerPolicyList, rangerDumpPath,
+      ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, conf)).thenReturn(policyFile);
+    int status = task.execute();
+    Assert.assertEquals(0, status);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
new file mode 100644
index 0000000..10c1afd
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT;
+
+/**
+ * Unit test class for testing Ranger Dump.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestRangerLoadTask {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(TestRangerLoadTask.class);
+  private RangerLoadTask task;
+
+  @Mock
+  private RangerRestClientImpl mockClient;
+
+  @Mock
+  private HiveConf conf;
+
+  @Mock
+  private RangerLoadWork work;
+
+  @Before
+  public void setup() throws Exception {
+    task = new RangerLoadTask(mockClient, conf, work);
+    Mockito.when(mockClient.changeDataSet(Mockito.anyList(), Mockito.anyString(), Mockito.anyString()))
+      .thenCallRealMethod();
+    Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true);
+  }
+
+  @Test
+  public void testFailureInvalidAuthProviderEndpoint() {
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn(null);
+    int status = task.execute();
+    Assert.assertEquals(40000, status);
+  }
+
+  @Test
+  public void testSuccessValidAuthProviderEndpoint() {
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint");
+    Mockito.when(work.getSourceDbName()).thenReturn("srcdb");
+    Mockito.when(work.getTargetDbName()).thenReturn("tgtdb");
+    int status = task.execute();
+    Assert.assertEquals(0, status);
+  }
+
+  @Test
+  public void testSuccessNonEmptyRangerPolicies() throws Exception {
+    String rangerResponse = "{\"metaDataInfo\":{\"Host name\":\"ranger.apache.org\","
+        + "\"Exported by\":\"hive\",\"Export time\":\"May 5, 2020, 8:55:03 AM\",\"Ranger apache version\""
+        + ":\"2.0.0.7.2.0.0-61\"},\"policies\":[{\"service\":\"cm_hive\",\"name\":\"db-level\",\"policyType\":0,"
+        + "\"description\":\"\",\"isAuditEnabled\":true,\"resources\":{\"database\":{\"values\":[\"aa\"],"
+        + "\"isExcludes\":false,\"isRecursive\":false},\"column\":{\"values\":[\"id\"],\"isExcludes\":false,"
+        + "\"isRecursive\":false},\"table\":{\"values\":[\"*\"],\"isExcludes\":false,\"isRecursive\":false}},"
+        + "\"policyItems\":[{\"accesses\":[{\"type\":\"select\",\"isAllowed\":true},{\"type\":\"update\","
+        + "\"isAllowed\":true}],\"users\":[\"admin\"],\"groups\":[\"public\"],\"conditions\":[],"
+        + "\"delegateAdmin\":false}],\"denyPolicyItems\":[],\"allowExceptions\":[],\"denyExceptions\":[],"
+        + "\"dataMaskPolicyItems\":[],\"rowFilterPolicyItems\":[],\"id\":40,\"guid\":"
+        + "\"4e2b3406-7b9a-4004-8cdf-7a239c8e2cae\",\"isEnabled\":true,\"version\":1}]}";
+    RangerExportPolicyList rangerPolicyList = new Gson().fromJson(rangerResponse, RangerExportPolicyList.class);
+    Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint");
+    Mockito.when(work.getSourceDbName()).thenReturn("srcdb");
+    Mockito.when(work.getTargetDbName()).thenReturn("tgtdb");
+    Path rangerDumpPath = new Path("/tmp");
+    Mockito.when(work.getCurrentDumpPath()).thenReturn(rangerDumpPath);
+    mockClient.saveRangerPoliciesToFile(rangerPolicyList,
+        rangerDumpPath, ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, new HiveConf());
+    Mockito.when(mockClient.readRangerPoliciesFromJsonFile(Mockito.any(), Mockito.any())).thenReturn(rangerPolicyList);
+    int status = task.execute();
+    Assert.assertEquals(0, status);
+  }
+}