You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/05/17 12:51:40 UTC

[GitHub] [hive] pkumarsinha opened a new pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

pkumarsinha opened a new pull request #1021:
URL: https://github.com/apache/hive/pull/1021


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426663392



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+    confMap.remove("hive.repl.atlas.replicatedto");

Review comment:
       It is ignored during load




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427025689



##########
File path: pom.xml
##########
@@ -127,6 +127,7 @@
     <apache-directory-server.version>1.5.7</apache-directory-server.version>
     <!-- Include arrow for LlapOutputFormatService -->
     <arrow.version>0.10.0</arrow.version>
+    <atlas.client.version>2.0.0</atlas.client.version>

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426672628



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements AtlasRESTClient{
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRESTClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRESTClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private static <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+    final ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Future<T> future = executor.submit(callable);
+    executor.shutdown();
+    try {
+      return future.get(timeout, timeUnit);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+      throw e;
+    } catch (ExecutionException e) {
+      Throwable t = e.getCause();
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof Exception) {
+        throw (Exception) t;
+      } else {
+        throw new IllegalStateException(t);
+      }
+    }
+  }
+
+  public InputStream exportData(AtlasExportRequest request) throws Exception {
+    LOG.debug("exportData: {}" + request);
+    return invokeWithRetry(new Callable<InputStream>() {
+      @Override
+      public InputStream call() throws Exception {
+        return clientV2.exportData(request);
+      }
+    }, null);
+  }
+
+  public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception {
+    AtlasImportResult defaultResult = getDefaultAtlasImportResult(request);
+    Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+    FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf());
+    if (!fs.exists(exportFilePath)) {
+      return defaultResult;
+    }
+    LOG.debug("HiveAtlasPlugin:importData: {}" + request);
+    return invokeWithRetry(new Callable<AtlasImportResult>() {
+      @Override
+      public AtlasImportResult call() throws Exception {
+        InputStream is = null;
+        try {
+          is = fs.open(exportFilePath);
+          return clientV2.importData(request, is);
+        } finally {
+          if (is != null) {
+            is.close();
+          }
+        }
+      }
+    }, defaultResult);
+  }
+
+  private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) {

Review comment:
       Using it in AtlasRESTClientImpl.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427023450



##########
File path: pom.xml
##########
@@ -127,6 +127,7 @@
     <apache-directory-server.version>1.5.7</apache-directory-server.version>
     <!-- Include arrow for LlapOutputFormatService -->
     <arrow.version>0.10.0</arrow.version>
+    <atlas.client.version>2.0.0</atlas.client.version>

Review comment:
       Is this the latest version?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426596518



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasProcess atlasProcess = new AtlasExportProcess();
+      String entityGuid = atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+              atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+      long currentModifiedTime = atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+      atlasProcess.run(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception during AtlasDumpTask.execute", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+    Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    atlasReplInfo.getSrcFsUri(),
+                    String.valueOf(lastModifiedTime)
+            )
+    );
+    Utils.writeOutput(listValues, dumpFile, conf, true);
+    LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString());
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.ATLAS_REPL_DUMP;
+  }
+
+  @Override
+  public String getName() {
+    return "ATLAS_DUMP_TASK";

Review comment:
       Name and stage type is same for all the tasks generally

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+    confMap.remove("hive.repl.atlas.replicatedto");

Review comment:
       what happens if its set on target cluster also?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{

Review comment:
       Can this info be part of the AtlasDumpWork? 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String tgtDB = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+    atlasReplInfo.setTimeStamp(lastTimeStamp);
+    return atlasReplInfo;
+  }
+
+  private long lastStoredTimeStamp() throws SemanticException {
+    Path prevMetadataPath = new Path(getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = prevMetadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return Long.parseLong(lineContents[1]);
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {
+    String val = conf.get(config);
+    if (StringUtils.isEmpty(val)) {
+      throw new SemanticException(config + " is mandatory config for Atlas metadata replication");
+    }
+    return val;
+  }
+
+  public boolean isBootstrap() {
+    return bootstrap;
+  }
+
+  public Path getPrevAtlasDumpDir() {

Review comment:
       why is this needed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String tgtDB = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+    atlasReplInfo.setTimeStamp(lastTimeStamp);
+    return atlasReplInfo;
+  }
+
+  private long lastStoredTimeStamp() throws SemanticException {
+    Path prevMetadataPath = new Path(getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = prevMetadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return Long.parseLong(lineContents[1]);
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {
+    String val = conf.get(config);
+    if (StringUtils.isEmpty(val)) {
+      throw new SemanticException(config + " is mandatory config for Atlas metadata replication");
+    }
+    return val;
+  }
+
+  public boolean isBootstrap() {

Review comment:
       why is this different for bootstrap and incremental

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+
+  public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.tgtDB = tgtDB;
+    this.stagingDir = stagingDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException {
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+    atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    return atlasReplInfo;
+  }
+
+  private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
+    Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = metadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return lineContents[0];
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {

Review comment:
       can be at a common place

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+
+  public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.tgtDB = tgtDB;
+    this.stagingDir = stagingDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException {
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+    atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    return atlasReplInfo;
+  }
+
+  private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {

Review comment:
       common method

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClient.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Atlas RESTClient interface for Atlas' REST APIs.
+ */
+public interface AtlasRESTClient {

Review comment:
       Class naming convention

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/DummyAtlasRESTClient.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class DummyAtlasRESTClient implements AtlasRESTClient {

Review comment:
       Can be named as NoOp

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasProcess.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for Atlas Processes, viz. Export & Import
+ */
+public abstract class AtlasProcess {
+  private static final String CLUSTER_NAME_SEPARATOR = "$";

Review comment:
       Can go to utils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasImportProcess.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs Atlas' Import.
+ */
+public class AtlasImportProcess extends AtlasProcess {
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasImportProcess.class);

Review comment:
       can be part of AtlasLoadTask

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -107,6 +107,9 @@ public int execute() {
       }
       work.setRootTask(this);
       this.parentTasks = null;
+      if (shouldLoadAtlasMetadata()) {

Review comment:
       should this be after ranger?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasProcess atlasProcess = new AtlasExportProcess();
+      String entityGuid = atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+              atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+      long currentModifiedTime = atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+      atlasProcess.run(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception during AtlasDumpTask.execute", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {

Review comment:
       test for this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasExportProcess.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Performs Atlas metadata export.
+ */
+public class AtlasExportProcess extends AtlasProcess {
+  private FileSystem fileSystem = null;
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasExportProcess.class);
+  private static final int DEF_BUF_SIZE = 8 * 1024;
+
+  public void run(AtlasReplInfo atlasReplInfo) throws SemanticException {

Review comment:
       This can be as part of AtlasDumpTask. Don't need a separate process for this

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements AtlasRESTClient{

Review comment:
       class naming convention

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -149,6 +149,10 @@ public int execute() {
           if (shouldDumpAuthorizationMetadata()) {
             initiateAuthorizationDumpTask();
           }
+          if (shouldDumpAtlasMetadata()) {

Review comment:
       should this be before ranger




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427213821



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("HiveAtlasPlugin: authStrategy: {} : urls: {}: userGroupInformation: {}",
+              authStrategy, baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  public AtlasRESTClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setAuthStrategy();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2;
+    LOG.info("HiveAtlasPlugin: authStrategyUsed: {} : {}", authStrategy, baseUrls);
+    switch (authStrategy) {
+      case KERBEROS:

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #1021:
URL: https://github.com/apache/hive/pull/1021


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427228411



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasExportProcess.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Performs Atlas metadata export.
+ */
+public class AtlasExportProcess extends AtlasProcess {
+  private FileSystem fileSystem = null;
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasExportProcess.class);
+  private static final int DEF_BUF_SIZE = 8 * 1024;
+
+  public void run(AtlasReplInfo atlasReplInfo) throws SemanticException {
+    LOG.info("HiveAtlasPlugin: Starting export from:{}", atlasReplInfo.getStagingDir());
+    try {
+      AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              getAtlasClusterName(atlasReplInfo.getSrcCluster()));
+      InputStream inputStream = exportData(atlasReplInfo.getAtlasEndpoint(), exportRequest, atlasReplInfo.getConf());
+      FileSystem fs = getFileSystem(atlasReplInfo);
+      Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+      writeDataToFile(fs, exportFilePath, inputStream);
+    } catch (SemanticException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    }
+  }
+
+  FileSystem getFileSystem(AtlasReplInfo atlasReplInfo) throws IOException {
+    if (fileSystem != null) {
+      return fileSystem;
+    }
+    return FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
+  }
+
+  protected InputStream exportData(String atlasEndpoint, AtlasExportRequest request, HiveConf conf) throws Exception {
+    return getClient(atlasEndpoint, conf).exportData(request);
+  }
+
+  private void writeDataToFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {
+    long numBytesWritten = writeFile(fs, exportFilePath, is);
+    LOG.info("HiveAtlasPlugin: writing to {} ({} bytes)", exportFilePath, numBytesWritten);
+  }
+
+  private long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {

Review comment:
       done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/DummyAtlasRESTClient.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class DummyAtlasRESTClient implements AtlasRESTClient {

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements AtlasRESTClient{

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426672007



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasProcess.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for Atlas Processes, viz. Export & Import
+ */
+public abstract class AtlasProcess {
+  private static final String CLUSTER_NAME_SEPARATOR = "$";
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasProcess.class);
+
+  private RESTClientBuilder builder = new RESTClientBuilder();
+  protected AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+
+  protected AtlasRESTClient getClient(String atlasEndpoint, HiveConf conf) throws SemanticException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return new DummyAtlasRESTClient();
+    }
+    return builder.baseUrl(atlasEndpoint).create();
+  }
+
+  public String getEntityGuid(String atlasEndpoint, String typeName, String attributeName,
+                              String attributeValue, HiveConf conf) throws SemanticException {
+    return getClient(atlasEndpoint, conf).getEntityGuid(typeName, attributeName, attributeValue);
+  }
+
+  public boolean getStatus(String atlasEndpoint, HiveConf conf) throws SemanticException {
+    return getClient(atlasEndpoint, conf).getStatus();
+  }
+
+  public abstract void run(AtlasReplInfo atlasReplInfo) throws SemanticException;
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasRESTClient client = getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+    AtlasServer atlasServer = client.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public String checkHiveEntityGuid(String atlasEndpoint, String fullyQualifiedClusterName, String srcDb, HiveConf conf)
+          throws SemanticException {
+    String clusterName = getAtlasClusterName(fullyQualifiedClusterName);
+    AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
+    Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
+    if (entries == null || entries.isEmpty()) {
+      throw new SemanticException("HiveAtlasPlugin: Could find entries in objectId for:" + clusterName);
+    }
+    Map.Entry<String, Object> item = entries.iterator().next();
+    String guid = getEntityGuid(atlasEndpoint, objectId.getTypeName(), item.getKey(), (String) item.getValue(), conf);
+    if (guid == null || guid.isEmpty()) {
+      throw new SemanticException("HiveAtlasPlugin: Entity not found:" + objectId);
+    }
+    return guid;
+  }
+
+  protected static String getAtlasClusterName(String clusterName) {

Review comment:
       Yes, if orchestration engine adds dc in cluster name. Have retained the behavior. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426639318



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasProcess.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for Atlas Processes, viz. Export & Import
+ */
+public abstract class AtlasProcess {
+  private static final String CLUSTER_NAME_SEPARATOR = "$";

Review comment:
       Can go to utils or be part of the rest client




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427539353



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -107,6 +107,9 @@ public int execute() {
       }
       work.setRootTask(this);
       this.parentTasks = null;
+      if (shouldLoadAtlasMetadata()) {

Review comment:
       Prior to ranger is fine and later can also help if tag based ranger policy come.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426648656



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasExportProcess.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Performs Atlas metadata export.
+ */
+public class AtlasExportProcess extends AtlasProcess {
+  private FileSystem fileSystem = null;
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasExportProcess.class);
+  private static final int DEF_BUF_SIZE = 8 * 1024;
+
+  public void run(AtlasReplInfo atlasReplInfo) throws SemanticException {
+    LOG.info("HiveAtlasPlugin: Starting export from:{}", atlasReplInfo.getStagingDir());
+    try {
+      AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              getAtlasClusterName(atlasReplInfo.getSrcCluster()));
+      InputStream inputStream = exportData(atlasReplInfo.getAtlasEndpoint(), exportRequest, atlasReplInfo.getConf());
+      FileSystem fs = getFileSystem(atlasReplInfo);
+      Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+      writeDataToFile(fs, exportFilePath, inputStream);
+    } catch (SemanticException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    }
+  }
+
+  FileSystem getFileSystem(AtlasReplInfo atlasReplInfo) throws IOException {
+    if (fileSystem != null) {
+      return fileSystem;
+    }
+    return FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
+  }
+
+  protected InputStream exportData(String atlasEndpoint, AtlasExportRequest request, HiveConf conf) throws Exception {
+    return getClient(atlasEndpoint, conf).exportData(request);
+  }
+
+  private void writeDataToFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {
+    long numBytesWritten = writeFile(fs, exportFilePath, is);
+    LOG.info("HiveAtlasPlugin: writing to {} ({} bytes)", exportFilePath, numBytesWritten);
+  }
+
+  private long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {

Review comment:
       can be part of utils

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasProcess.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for Atlas Processes, viz. Export & Import
+ */
+public abstract class AtlasProcess {
+  private static final String CLUSTER_NAME_SEPARATOR = "$";
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasProcess.class);
+
+  private RESTClientBuilder builder = new RESTClientBuilder();
+  protected AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+
+  protected AtlasRESTClient getClient(String atlasEndpoint, HiveConf conf) throws SemanticException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return new DummyAtlasRESTClient();
+    }
+    return builder.baseUrl(atlasEndpoint).create();
+  }
+
+  public String getEntityGuid(String atlasEndpoint, String typeName, String attributeName,
+                              String attributeValue, HiveConf conf) throws SemanticException {
+    return getClient(atlasEndpoint, conf).getEntityGuid(typeName, attributeName, attributeValue);
+  }
+
+  public boolean getStatus(String atlasEndpoint, HiveConf conf) throws SemanticException {
+    return getClient(atlasEndpoint, conf).getStatus();
+  }
+
+  public abstract void run(AtlasReplInfo atlasReplInfo) throws SemanticException;
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasRESTClient client = getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+    AtlasServer atlasServer = client.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public String checkHiveEntityGuid(String atlasEndpoint, String fullyQualifiedClusterName, String srcDb, HiveConf conf)
+          throws SemanticException {
+    String clusterName = getAtlasClusterName(fullyQualifiedClusterName);
+    AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb);
+    Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet();
+    if (entries == null || entries.isEmpty()) {
+      throw new SemanticException("HiveAtlasPlugin: Could find entries in objectId for:" + clusterName);
+    }
+    Map.Entry<String, Object> item = entries.iterator().next();
+    String guid = getEntityGuid(atlasEndpoint, objectId.getTypeName(), item.getKey(), (String) item.getValue(), conf);
+    if (guid == null || guid.isEmpty()) {
+      throw new SemanticException("HiveAtlasPlugin: Entity not found:" + objectId);
+    }
+    return guid;
+  }
+
+  protected static String getAtlasClusterName(String clusterName) {

Review comment:
       is this needed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasProcess atlasProcess = new AtlasExportProcess();
+      String entityGuid = atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+              atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+      long currentModifiedTime = atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+      atlasProcess.run(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception during AtlasDumpTask.execute", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+    Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    atlasReplInfo.getSrcFsUri(),
+                    String.valueOf(lastModifiedTime)
+            )
+    );
+    Utils.writeOutput(listValues, dumpFile, conf, true);

Review comment:
       is retry present?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasExportProcess.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Performs Atlas metadata export.
+ */
+public class AtlasExportProcess extends AtlasProcess {
+  private FileSystem fileSystem = null;
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasExportProcess.class);
+  private static final int DEF_BUF_SIZE = 8 * 1024;
+
+  public void run(AtlasReplInfo atlasReplInfo) throws SemanticException {
+    LOG.info("HiveAtlasPlugin: Starting export from:{}", atlasReplInfo.getStagingDir());
+    try {
+      AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              getAtlasClusterName(atlasReplInfo.getSrcCluster()));
+      InputStream inputStream = exportData(atlasReplInfo.getAtlasEndpoint(), exportRequest, atlasReplInfo.getConf());
+      FileSystem fs = getFileSystem(atlasReplInfo);
+      Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+      writeDataToFile(fs, exportFilePath, inputStream);
+    } catch (SemanticException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    }
+  }
+
+  FileSystem getFileSystem(AtlasReplInfo atlasReplInfo) throws IOException {
+    if (fileSystem != null) {
+      return fileSystem;
+    }
+    return FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
+  }
+
+  protected InputStream exportData(String atlasEndpoint, AtlasExportRequest request, HiveConf conf) throws Exception {
+    return getClient(atlasEndpoint, conf).exportData(request);
+  }
+
+  private void writeDataToFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {
+    long numBytesWritten = writeFile(fs, exportFilePath, is);
+    LOG.info("HiveAtlasPlugin: writing to {} ({} bytes)", exportFilePath, numBytesWritten);
+  }
+
+  private long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws IOException {

Review comment:
       retry

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {

Review comment:
       This will always be Kerberos. Don't need to infer

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements AtlasRESTClient{
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRESTClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRESTClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private static <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
+    final ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Future<T> future = executor.submit(callable);
+    executor.shutdown();
+    try {
+      return future.get(timeout, timeUnit);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+      throw e;
+    } catch (ExecutionException e) {
+      Throwable t = e.getCause();
+      if (t instanceof Error) {
+        throw (Error) t;
+      } else if (t instanceof Exception) {
+        throw (Exception) t;
+      } else {
+        throw new IllegalStateException(t);
+      }
+    }
+  }
+
+  public InputStream exportData(AtlasExportRequest request) throws Exception {
+    LOG.debug("exportData: {}" + request);
+    return invokeWithRetry(new Callable<InputStream>() {
+      @Override
+      public InputStream call() throws Exception {
+        return clientV2.exportData(request);
+      }
+    }, null);
+  }
+
+  public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception {
+    AtlasImportResult defaultResult = getDefaultAtlasImportResult(request);
+    Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+    FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf());
+    if (!fs.exists(exportFilePath)) {
+      return defaultResult;
+    }
+    LOG.debug("HiveAtlasPlugin:importData: {}" + request);
+    return invokeWithRetry(new Callable<AtlasImportResult>() {
+      @Override
+      public AtlasImportResult call() throws Exception {
+        InputStream is = null;
+        try {
+          is = fs.open(exportFilePath);
+          return clientV2.importData(request, is);
+        } finally {
+          if (is != null) {
+            is.close();
+          }
+        }
+      }
+    }, defaultResult);
+  }
+
+  private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) {

Review comment:
       Can this be part of NoOp client?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("HiveAtlasPlugin: authStrategy: {} : urls: {}: userGroupInformation: {}",
+              authStrategy, baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  public AtlasRESTClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setAuthStrategy();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2;
+    LOG.info("HiveAtlasPlugin: authStrategyUsed: {} : {}", authStrategy, baseUrls);
+    switch (authStrategy) {
+      case KERBEROS:
+        clientV2 = new AtlasClientV2(this.userGroupInformation,
+                this.userGroupInformation.getShortUserName(), baseUrls);
+        return new AtlasRESTClientImpl(clientV2);
+      default:
+        throw new SemanticException("AtlasRESTClient: unsupported auth strategy:" + authStrategy);
+    }
+  }
+
+  private void initializeAtlasApplicationProperties() throws SemanticException {
+    try {
+      ApplicationProperties.set(getClientProperties());

Review comment:
       Does this have to be static?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("HiveAtlasPlugin: authStrategy: {} : urls: {}: userGroupInformation: {}",
+              authStrategy, baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  public AtlasRESTClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setAuthStrategy();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2;
+    LOG.info("HiveAtlasPlugin: authStrategyUsed: {} : {}", authStrategy, baseUrls);
+    switch (authStrategy) {
+      case KERBEROS:

Review comment:
       Only Kerberos is supported




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427194647



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {

Review comment:
       Do you need this to be public?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1568,123 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+    verifyAtlasMetadataPresent();
+
+    confMap.remove("hive.repl.atlas.replicatedto");
+    replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+  }
+
+  @Test
+  public void testAtlasMissingConfigs() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)");
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");

Review comment:
       use constants

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1568,123 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+    verifyAtlasMetadataPresent();
+
+    confMap.remove("hive.repl.atlas.replicatedto");
+    replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+  }
+
+  @Test
+  public void testAtlasMissingConfigs() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)");
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");
+    confMap.put("hive.repl.include.atlas.metadata", "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.atlas.endpoint", true);
+    confMap.put("hive.repl.atlas.endpoint", "http://localhost:21000/atlas");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.atlas.replicatedto", true);
+    confMap.put("hive.repl.atlas.replicatedto", replicatedDbName);
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.source.cluster.name", true);
+    confMap.put("hive.repl.source.cluster.name", "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.target.cluster.name", true);
+    confMap.put("hive.repl.target.cluster.name", "cluster1");
+    primary.dump(primaryDbName, getAtlasClause(confMap));
+    verifyAtlasMetadataPresent();
+    confMap.clear();
+    confMap.put("hive.in.test", "true");
+    confMap.put("hive.repl.include.atlas.metadata", "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.atlas.endpoint", false);
+    confMap.put("hive.repl.atlas.endpoint", "http://localhost:21000/atlas");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.source.cluster.name", false);
+    confMap.put("hive.repl.source.cluster.name", "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), "hive.repl.target.cluster.name", false);
+    confMap.put("hive.repl.target.cluster.name", "cluster1");
+    primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap));
+  }
+
+  private void verifyAtlasMetadataPresent() throws IOException {
+    Path dbReplDir = new Path(primary.repldDir,
+            Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
+    FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf());
+    assertTrue(fs.exists(dbReplDir));
+    FileStatus dumpRoots[]  = fs.listStatus(dbReplDir);
+    assert(dumpRoots.length == 1);
+    Path dumpRoot = dumpRoots[0].getPath();
+    assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR)));
+    Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR);
+    assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot));
+    assertTrue("Atlas export file doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME)));
+    assertTrue("Atlas dump metadata doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)));
+    BufferedReader br = null;
+    try {
+      br = new BufferedReader(new InputStreamReader(
+              fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]);
+      assertEquals(0, Long.parseLong(lineContents[1]));
+    } finally {
+      if (br != null)
+        br.close();
+    }
+  }
+
+  private void ensureFailedReplOperation(List<String> clause, String conf, boolean dump) throws Throwable {
+    try {
+      if (dump) {
+        primary.dump(primaryDbName, clause);
+      } else {
+        primary.load(replicatedDbName, primaryDbName, clause);
+      }
+      Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail.");
+    } catch (SemanticException e) {
+      assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication"));
+    }
+  }
+
+  private Map<String, String> defaultAtlasConfMap() {
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");

Review comment:
       use conf constants

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws SemanticException {

Review comment:
       Most of the method scope can be private. Its not called from anywhere else

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws SemanticException {
+    LOG.info("HiveAtlasPlugin: Starting export from:{}", atlasReplInfo.getStagingDir());
+    try {
+      AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getSrcCluster()));
+      InputStream inputStream = atlasRestClient.exportData(exportRequest);

Review comment:
       This is created here and closed else where.
   Should be closed here

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();

Review comment:
       Its only used for import. Is this reused? Do you need class level variables?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);

Review comment:
       should be validated if a valid url

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  private transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo  = work.getAtlasReplInfo();
+      LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      int importCount = importAtlasMetadata(atlasReplInfo);
+      LOG.info("HiveAtlasPlugin: Atlas entities import count {}", importCount);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while loading atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
+    AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(atlasReplInfo.getSrcDB(),
+            atlasReplInfo.getTgtDB(), atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getSrcCluster()),
+            atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getTgtCluster()), atlasReplInfo.getSrcFsUri(),
+            atlasReplInfo.getTgtFsUri(), atlasReplInfo.getSrcCluster());
+    AtlasImportResult result = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf())
+            .importData(importRequest, atlasReplInfo);
+    if (result == null || result.getProcessedEntities() == null) {
+      LOG.info("HiveAtlasPlugin: No entities found");

Review comment:
       HiveAtlasPlugin tag is needed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;

Review comment:
       Consider moving to constructor based initialization. This class has other public methods which can get called without proper initialization

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {

Review comment:
       Can get atlasReplInfo from work

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  protected static transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      atlasRestClient = clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException {
+    AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws SemanticException {

Review comment:
       get atlasReplInfo from work

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+    confMap.remove("hive.repl.atlas.replicatedto");

Review comment:
       then this conf needn't be removed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.CLUSTER_NAME_SEPARATOR;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class);
+  public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+  static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+  static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+  static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+  private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+  private static final String ATTRIBUTE_NAME_NAME = ".name";
+  private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+  private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom";
+  private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+  private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME;
+  private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME;
+  private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION;
+  private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION;
+
+  private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+  private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+  public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) {
+    List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer);
+    Map<String, Object> options = getOptions(atlasReplInfo);
+    return createRequest(itemsToExport, options);
+  }
+
+  public List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) {

Review comment:
       Do you need all methods to be public

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS

Review comment:
       If Kerberos is the only strategy supported do you need this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;

Review comment:
       Is the conf needed in the work? Its already there in the task

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+  private final HiveConf conf;

Review comment:
       This conf is already part of task. Is this needed in the work as well? Conf wont be serializable

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.CLUSTER_NAME_SEPARATOR;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class);
+  public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+  static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+  static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+  static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+  private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+  private static final String ATTRIBUTE_NAME_NAME = ".name";
+  private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+  private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom";
+  private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+  private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME;
+  private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME;
+  private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION;
+  private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION;
+
+  private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+  private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+  public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) {
+    List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer);
+    Map<String, Object> options = getOptions(atlasReplInfo);
+    return createRequest(itemsToExport, options);
+  }
+
+  public List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) {
+    List<AtlasObjectId> atlasObjectIds = new ArrayList<>();
+    final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB());
+    atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+    return atlasObjectIds;
+  }
+
+  private AtlasExportRequest createRequest(final List<AtlasObjectId> itemsToExport,
+                                           final Map<String, Object> options) {
+    AtlasExportRequest request = new AtlasExportRequest() {
+      {
+        setItemsToExport(itemsToExport);
+        setOptions(options);
+      }
+    };
+    LOG.debug("createRequest: {}" + request);
+    return request;
+  }
+
+  private Map<String, Object> getOptions(AtlasReplInfo atlasReplInfo) {
+    String targetCluster = atlasReplInfo.getTgtCluster();
+    Map<String, Object> options = new HashMap<>();
+    options.put(AtlasExportRequest.OPTION_FETCH_TYPE, AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
+    options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, atlasReplInfo.getTimeStamp());
+    options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true);
+    if (targetCluster != null && !targetCluster.isEmpty()) {
+      options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster);
+    }
+    return options;
+  }
+
+  public AtlasObjectId getItemToExport(String srcCluster, String srcDB) {
+    final String qualifiedName = getQualifiedName(srcCluster, srcDB);
+    return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+  }
+
+  public String getQualifiedName(String clusterName, String srcDb) {
+    String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, srcDb.toLowerCase(), clusterName);
+    LOG.debug("AtlasProcess: getQualifiedName: {}", qualifiedName);
+    return qualifiedName;
+  }
+
+  public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet,
+                                                String sourceClusterName, String targetClusterName,
+                                                String sourcefsEndpoint, String targetFsEndpoint,
+                                                String sourceClusterFullyQualifiedName) {
+    AtlasImportRequest request = new AtlasImportRequest();
+    addTransforms(request.getOptions(),
+            sourceClusterName, targetClusterName,
+            sourceDataSet, targetDataSet,
+            sourcefsEndpoint, targetFsEndpoint);
+    addReplicatedFrom(request.getOptions(), sourceClusterFullyQualifiedName);
+    LOG.debug("AtlasProcess: importRequest: {}" + request);
+    return request;
+  }
+
+  private void addTransforms(Map<String, String> options, String srcClusterName,
+                             String tgtClusterName, String sourceDataSet, String targetDataSet,
+                             String sourcefsEndpoint, String targetFsEndpoint) {
+    List<AttributeTransform> transforms = new ArrayList<>();
+    String sanitizedSourceClusterName = sanitizeForClassificationName(srcClusterName);
+    addClassificationTransform(transforms,
+            String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName));
+    addClearReplicationAttributesTransform(transforms);
+    addClusterRenameTransform(transforms, srcClusterName, tgtClusterName);
+    if (!sourceDataSet.equals(targetDataSet)) {
+      addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet);
+    }
+    addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint);
+    options.put(AtlasImportRequest.TRANSFORMERS_KEY, AtlasType.toJson(transforms));
+  }
+
+  private void addLocationTransform(List<AttributeTransform> transforms, String srcFsUri, String tgtFsUri) {
+    transforms.add(create(
+            HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+    transforms.add(create(
+            HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+  }
+
+  private void addDataSetRenameTransform(List<AttributeTransform> transforms,
+                                         String sourceDataSet, String targetDataSet) {
+    transforms.add(create(
+            HIVE_DB_NAME, "EQUALS: " + sourceDataSet,
+            HIVE_DB_NAME, "SET: " + targetDataSet));
+  }
+
+  private void addClusterRenameTransform(List<AttributeTransform> transforms,
+                                         String srcClusterName, String tgtClustername) {
+    transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName,
+            HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername));
+  }
+
+  private void addReplicatedFrom(Map<String, String> options, String sourceClusterName) {
+    options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, sourceClusterName);
+  }
+
+  private void addClassificationTransform(List<AttributeTransform> transforms, String classificationName) {
+    transforms.add(create("__entity", "topLevel: ",
+            "__entity", "ADD_CLASSIFICATION: " + classificationName));
+  }
+
+  private String sanitizeForClassificationName(String s) {
+    if (s != null && s.isEmpty()) {
+      return s;
+    }
+    return s.replace('-', '_').replace(' ', '_');
+  }
+
+  private void addClearReplicationAttributesTransform(List<AttributeTransform> transforms) {
+    Map<String, String> actions = new HashMap<>();
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, "CLEAR:");
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, "CLEAR:");
+
+    transforms.add(new AttributeTransform(null, actions));
+  }
+
+  private AttributeTransform create(String conditionLhs, String conditionRhs,
+                                    String actionLhs, String actionRhs) {
+    return new AttributeTransform(Collections.singletonMap(conditionLhs, conditionRhs),
+            Collections.singletonMap(actionLhs, actionRhs));
+  }
+
+  public String getAtlasClusterName(String clusterName) {

Review comment:
       Mayn't be needed for apache

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public AtlasRestClientBuilder() {
+  }
+
+  public AtlasRestClient getClient(String atlasEndpoint, HiveConf conf) throws SemanticException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return new NoOpAtlasRestClient();
+    }
+    return baseUrl(atlasEndpoint).create();
+  }
+
+  private AtlasRestClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+    return this;
+  }
+
+  private AtlasRestClientBuilder setAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;

Review comment:
       If Kerberos is the only strategy supported do you need this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRestClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {

Review comment:
       Do we need this? Can we reuse the existing retry and add some timeouts there

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.atlas.AtlasServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Implement retry logic for service calls.
+ */
+public class RetryingClient {
+  private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class);
+  private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000);
+  private static final int RETRY_COUNT_DEFAULT = 5;
+  private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
+  private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
+  private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
+  private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT;
+  private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT;
+
+  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {

Review comment:
       Can we reuse existing retry classes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426655712



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -82,6 +82,16 @@
   // Root base directory name for ranger.
   public static final String REPL_RANGER_BASE_DIR = "ranger";
 
+  // Root base directory name for atlas.
+  public static final String REPL_ATLAS_BASE_DIR = "atlas";
+
+  // Atlas meta data export file.
+  public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip";
+
+  // Config for hadoop default file system.
+  public static final String DEFAULT_FS_CONFIG = "fs.defaultFS";

Review comment:
       Is this set for CDP clusters as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427529619



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+
+  public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.tgtDB = tgtDB;
+    this.stagingDir = stagingDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException {
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+    atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    return atlasReplInfo;
+  }
+
+  private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {

Review comment:
       The method impl is different in both cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427227921



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -82,6 +82,16 @@
   // Root base directory name for ranger.
   public static final String REPL_RANGER_BASE_DIR = "ranger";
 
+  // Root base directory name for atlas.
+  public static final String REPL_ATLAS_BASE_DIR = "atlas";
+
+  // Atlas meta data export file.
+  public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip";
+
+  // Config for hadoop default file system.
+  public static final String DEFAULT_FS_CONFIG = "fs.defaultFS";

Review comment:
       yes

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("HiveAtlasPlugin: authStrategy: {} : urls: {}: userGroupInformation: {}",
+              authStrategy, baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  public AtlasRESTClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setAuthStrategy();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2;
+    LOG.info("HiveAtlasPlugin: authStrategyUsed: {} : {}", authStrategy, baseUrls);
+    switch (authStrategy) {
+      case KERBEROS:

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426671412



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasProcess atlasProcess = new AtlasExportProcess();
+      String entityGuid = atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+              atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+      long currentModifiedTime = atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+      atlasProcess.run(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception during AtlasDumpTask.execute", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+    Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    atlasReplInfo.getSrcFsUri(),
+                    String.valueOf(lastModifiedTime)
+            )
+    );
+    Utils.writeOutput(listValues, dumpFile, conf, true);

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427213470



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasImportProcess.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs Atlas' Import.
+ */
+public class AtlasImportProcess extends AtlasProcess {
+  protected static final Logger LOG = LoggerFactory.getLogger(AtlasImportProcess.class);

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements AtlasRESTClient{

Review comment:
       Fixed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/DummyAtlasRESTClient.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class DummyAtlasRESTClient implements AtlasRESTClient {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426666281



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String tgtDB = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+    atlasReplInfo.setTimeStamp(lastTimeStamp);
+    return atlasReplInfo;
+  }
+
+  private long lastStoredTimeStamp() throws SemanticException {
+    Path prevMetadataPath = new Path(getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = prevMetadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return Long.parseLong(lineContents[1]);
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {
+    String val = conf.get(config);
+    if (StringUtils.isEmpty(val)) {
+      throw new SemanticException(config + " is mandatory config for Atlas metadata replication");
+    }
+    return val;
+  }
+
+  public boolean isBootstrap() {
+    return bootstrap;
+  }
+
+  public Path getPrevAtlasDumpDir() {

Review comment:
       During incremental we load the changeMarker info from prev dump.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427212447



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by (load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+    confMap.remove("hive.repl.atlas.replicatedto");

Review comment:
       Nope, need not be, but I was testing that we are fine without this config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] aasha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r428057392



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo  = createAtlasReplInfo();
+      LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      int importCount = importAtlasMetadata(atlasReplInfo);
+      LOG.info("Atlas entities import count {}", importCount);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while loading atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+    String errorFormat = "%s is mandatory config for Atlas metadata replication";
+    //Also validates URL for endpoint.
+    String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
+            .toString();
+    String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
+    String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(),
+            srcCluster, tgtCluster, work.getStagingDir(), conf);
+    atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+    atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    return atlasReplInfo;
+  }
+
+  private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
+    Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = metadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return lineContents[0];
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {

Review comment:
       Can be removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426668852



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+    String tgtDB = getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+    String srcCluster = getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+    String tgtCluster = getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB, srcCluster, tgtCluster, stagingDir, conf);
+    atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+    long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+    atlasReplInfo.setTimeStamp(lastTimeStamp);
+    return atlasReplInfo;
+  }
+
+  private long lastStoredTimeStamp() throws SemanticException {
+    Path prevMetadataPath = new Path(getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
+    BufferedReader br = null;
+    try {
+      FileSystem fs = prevMetadataPath.getFileSystem(conf);
+      br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      return Long.parseLong(lineContents[1]);
+    } catch (Exception ex) {
+      throw new SemanticException(ex);
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+    }
+  }
+
+  private String getNonEmpty(String config) throws SemanticException {
+    String val = conf.get(config);
+    if (StringUtils.isEmpty(val)) {
+      throw new SemanticException(config + " is mandatory config for Atlas metadata replication");
+    }
+    return val;
+  }
+
+  public boolean isBootstrap() {

Review comment:
       changeMarker is different for bootstrap and incremental.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] commented on pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #1021:
URL: https://github.com/apache/hive/pull/1021#issuecomment-664708321


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
   Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427415900



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
+  private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRestClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {

Review comment:
       Yes, we can. However, this requires enhancement in the current one where we would need to ideally parameterize the timeout and retry count. For a timeout based approach, we would need to execute it in different thread. Since this is used in current form in multiple places. I will file a separate JIRA to address that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #1021: HIVE-23353 : Atlas metadata replication scheduling

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426673131



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RESTClientBuilder.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * RestClientBuilder for AtlasRESTClient.
+ */
+public class RESTClientBuilder {
+  private static final Logger LOG = LoggerFactory.getLogger(RESTClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public RESTClientBuilder() {
+  }
+
+  public RESTClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+
+    return this;
+  }
+
+  public RESTClientBuilder setAuthStrategy() throws SemanticException {
+    return inferKerberosAuthStrategy();
+  }
+
+  private RESTClientBuilder inferKerberosAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;
+      this.userGroupInformation = UserGroupInformation.getLoginUser();
+      LOG.info("HiveAtlasPlugin: authStrategy: {} : urls: {}: userGroupInformation: {}",
+              authStrategy, baseUrls, userGroupInformation);
+    } catch (Exception e) {
+      throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e);
+    }
+    return this;
+  }
+
+  public AtlasRESTClient create() throws SemanticException {
+    if (baseUrls == null || baseUrls.length == 0) {
+      throw new SemanticException("baseUrls is not set.");
+    }
+    setAuthStrategy();
+    initializeAtlasApplicationProperties();
+    AtlasClientV2 clientV2;
+    LOG.info("HiveAtlasPlugin: authStrategyUsed: {} : {}", authStrategy, baseUrls);
+    switch (authStrategy) {
+      case KERBEROS:
+        clientV2 = new AtlasClientV2(this.userGroupInformation,
+                this.userGroupInformation.getShortUserName(), baseUrls);
+        return new AtlasRESTClientImpl(clientV2);
+      default:
+        throw new SemanticException("AtlasRESTClient: unsupported auth strategy:" + authStrategy);
+    }
+  }
+
+  private void initializeAtlasApplicationProperties() throws SemanticException {
+    try {
+      ApplicationProperties.set(getClientProperties());

Review comment:
       Yes, that's Atlas' interface. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org