You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/06/02 06:30:46 UTC

[hive] branch master updated: HIVE-23514: Add Atlas metadata replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ab407  HIVE-23514: Add Atlas metadata replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)
55ab407 is described below

commit 55ab407da05eed1991758ee9baa7cca9d1412ed1
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Tue Jun 2 12:00:33 2020 +0530

    HIVE-23514: Add Atlas metadata replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../hadoop/hive/ql/exec/repl/AtlasDumpTask.java    | 21 ++++--
 .../hadoop/hive/ql/exec/repl/AtlasLoadTask.java    | 10 ++-
 .../hadoop/hive/ql/parse/repl/ReplState.java       |  4 ++
 .../ql/parse/repl/dump/log/AtlasDumpLogger.java    | 47 ++++++++++++
 .../parse/repl/dump/log/state/AtlasDumpBegin.java  | 42 +++++++++++
 .../ql/parse/repl/dump/log/state/AtlasDumpEnd.java | 47 ++++++++++++
 .../ql/parse/repl/load/log/AtlasLoadLogger.java    | 48 +++++++++++++
 .../parse/repl/load/log/state/AtlasLoadBegin.java  | 45 ++++++++++++
 .../ql/parse/repl/load/log/state/AtlasLoadEnd.java | 58 +++++++++++++++
 .../hive/ql/exec/repl/TestAtlasDumpTask.java       | 83 ++++++++++++++++++++++
 .../hive/ql/exec/repl/TestAtlasLoadTask.java       | 73 +++++++++++++++++++
 .../hive/ql/exec/repl/TestRangerLoadTask.java      |  2 +-
 12 files changed, 470 insertions(+), 10 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 26cdc6b..be48f99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,14 +68,19 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
       AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
       LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
               atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
+              atlasReplInfo.getStagingDir().toString());
+      replLogger.startLog();
       atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
               .getClient(atlasReplInfo.getConf());
       AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
       String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(),
               atlasReplInfo.getSrcDB());
       long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
-      dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+      long numBytesWritten = dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+      LOG.debug("Finished dumping atlas metadata, total:{} bytes written", numBytesWritten);
       createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      replLogger.endLog(0L);
       return 0;
     } catch (Exception e) {
       LOG.error("Exception while dumping atlas metadata", e);
@@ -83,7 +89,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     }
   }
 
-  private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+  public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
     String errorFormat = "%s is mandatory config for Atlas metadata replication";
     //Also validates URL for endpoint.
     String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
@@ -99,7 +105,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     return atlasReplInfo;
   }
 
-  private long lastStoredTimeStamp() throws SemanticException {
+  public long lastStoredTimeStamp() throws SemanticException {
     Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
     BufferedReader br = null;
     try {
@@ -132,17 +138,17 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     return ret;
   }
 
-  private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
+  public long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
           throws SemanticException {
     InputStream inputStream = null;
+    long numBytesWritten = 0L;
     try {
       AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
               atlasReplInfo.getSrcCluster());
       inputStream = atlasRestClient.exportData(exportRequest);
       FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
       Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
-      long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
-      LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten);
+      numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
     } catch (SemanticException ex) {
       throw ex;
     } catch (Exception ex) {
@@ -156,6 +162,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
         }
       }
     }
+    return numBytesWritten;
   }
 
   private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName,
@@ -174,7 +181,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
     return guid;
   }
 
-  private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+  public void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
     Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
     List<List<String>> listValues = new ArrayList<>();
     listValues.add(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index fceded5..fa18bf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +58,11 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
       AtlasReplInfo atlasReplInfo  = createAtlasReplInfo();
       LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
               atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+      AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
+              atlasReplInfo.getStagingDir().toString());
+      replLogger.startLog();
       int importCount = importAtlasMetadata(atlasReplInfo);
+      replLogger.endLog(importCount);
       LOG.info("Atlas entities import count {}", importCount);
       return 0;
     } catch (Exception e) {
@@ -66,7 +72,7 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
     }
   }
 
-  private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+  public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
     String errorFormat = "%s is mandatory config for Atlas metadata replication";
     //Also validates URL for endpoint.
     String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
@@ -105,7 +111,7 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
     }
   }
 
-  private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
+  public int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
     AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
     AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(
             atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
index e441153..34fd2fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
@@ -46,11 +46,15 @@ public abstract class ReplState {
     TABLE_DUMP,
     FUNCTION_DUMP,
     EVENT_DUMP,
+    ATLAS_DUMP_START,
+    ATLAS_DUMP_END,
     RANGER_DUMP_START,
     RANGER_DUMP_END,
     TABLE_LOAD,
     FUNCTION_LOAD,
     EVENT_LOAD,
+    ATLAS_LOAD_START,
+    ATLAS_LOAD_END,
     RANGER_LOAD_START,
     RANGER_LOAD_END,
     END
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java
new file mode 100644
index 0000000..840c51e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log;
+
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpBegin;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpEnd;
+
+/**
+ * AtlasDumpLogger.
+ * Repllogger for Atlas metadata dump.
+ **/
+public class AtlasDumpLogger extends ReplLogger<Long> {
+  private String dbName;
+  private String dumpDir;
+
+  public AtlasDumpLogger(String dbName, String dumpDir) {
+    this.dbName = dbName;
+    this.dumpDir = dumpDir;
+  }
+
+  @Override
+  public void startLog() {
+    new AtlasDumpBegin(dbName).log(LogTag.ATLAS_DUMP_START);
+  }
+
+  @Override
+  public void endLog(Long count) {
+    new AtlasDumpEnd(dbName, dumpDir).log(LogTag.ATLAS_DUMP_END);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java
new file mode 100644
index 0000000..02237c5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * AtlasDumpBegin.
+ *
+ * ReplState to define Atlas Dump Start.
+ **/
+public class AtlasDumpBegin extends ReplState {
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String dbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private Long dumpStartTime;
+
+  public AtlasDumpBegin(String dbName) {
+    this.dbName = dbName;
+    this.dumpStartTime = System.currentTimeMillis() / 1000;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java
new file mode 100644
index 0000000..074f94d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * AtlasDumpEnd.
+ *
+ * ReplState to define Atlas Dump End.
+ **/
+public class AtlasDumpEnd extends ReplState {
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String dbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private Long dumpEndTime;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String dumpDir;
+
+  public AtlasDumpEnd(String dbName, String dumpDir) {
+    this.dbName = dbName;
+    this.dumpEndTime = System.currentTimeMillis() / 1000;
+    this.dumpDir = dumpDir;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java
new file mode 100644
index 0000000..603683d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log;
+
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadBegin;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadEnd;
+
+/**
+ * Repl logger for Atlas metadata load task.
+ **/
+public class AtlasLoadLogger extends ReplLogger<Integer> {
+  private String sourceDbName;
+  private String targetDbName;
+  private String dumpDir;
+
+  public AtlasLoadLogger(String sourceDbName, String targetDbName, String dumpDir) {
+    this.sourceDbName = sourceDbName;
+    this.targetDbName = targetDbName;
+    this.dumpDir = dumpDir;
+  }
+
+  @Override
+  public void startLog() {
+    new AtlasLoadBegin(sourceDbName, targetDbName).log(LogTag.ATLAS_LOAD_START);
+  }
+
+  @Override
+  public void endLog(Integer count) {
+    new AtlasLoadEnd(sourceDbName, targetDbName, count, dumpDir).log(LogTag.ATLAS_LOAD_END);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java
new file mode 100644
index 0000000..a29881d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Replication state for Atlas Load Begin.
+ **/
+public class AtlasLoadBegin extends ReplState {
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String sourceDbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String targetDbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private Long loadStartTime;
+
+  public AtlasLoadBegin(String sourceDbName, String targetDbName) {
+    this.sourceDbName = sourceDbName;
+    this.targetDbName = targetDbName;
+    this.loadStartTime = System.currentTimeMillis() / 1000;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java
new file mode 100644
index 0000000..c234af4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Replication state for Atlas Load End.
+ **/
+public class AtlasLoadEnd extends ReplState {
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String sourceDbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String targetDbName;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private Long numOfEntities;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private Long loadEndTime;
+
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  @JsonProperty
+  private String dumpDir;
+
+  public AtlasLoadEnd(String sourceDbName,
+                      String targetDbName,
+                      long numOfEntities,
+                      String dumpDir) {
+    this.sourceDbName = sourceDbName;
+    this.targetDbName = targetDbName;
+    this.numOfEntities = numOfEntities;
+    this.loadEndTime = System.currentTimeMillis() / 1000;
+    this.dumpDir = dumpDir;
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
new file mode 100644
index 0000000..dee332f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.ArgumentMatchers.any;
+
+/**
+ * Unit test class for testing Atlas metadata Dump.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestAtlasDumpTask {
+
+  @Mock
+  private AtlasDumpTask atlasDumpTask;
+
+  @Mock
+  private HiveConf conf;
+
+  @Test
+  public void testAtlasDumpMetrics() throws Exception {
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB",
+            "tgtDb", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf);
+    atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020");
+    atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020");
+    Mockito.when(atlasDumpTask.createAtlasReplInfo()).thenReturn(atlasReplInfo);
+    Mockito.when(atlasDumpTask.lastStoredTimeStamp()).thenReturn(0L);
+    Mockito.when(atlasDumpTask.dumpAtlasMetaData(any(AtlasRequestBuilder.class), any(AtlasReplInfo.class)))
+            .thenReturn(0L);
+    Mockito.when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true);
+    Logger logger = Mockito.mock(Logger.class);
+    Whitebox.setInternalState(ReplState.class, logger);
+    Mockito.when(atlasDumpTask.execute()).thenCallRealMethod();
+    int status = atlasDumpTask.execute();
+    Assert.assertEquals(0, status);
+    ArgumentCaptor<String> replStateCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Object> eventCaptor = ArgumentCaptor.forClass(Object.class);
+    ArgumentCaptor<Object> eventDetailsCaptor = ArgumentCaptor.forClass(Object.class);
+    Mockito.verify(logger,
+            Mockito.times(2)).info(replStateCaptor.capture(),
+            eventCaptor.capture(), eventDetailsCaptor.capture());
+    Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0));
+    Assert.assertEquals("ATLAS_DUMP_START", eventCaptor.getAllValues().get(0));
+    Assert.assertEquals("ATLAS_DUMP_END", eventCaptor.getAllValues().get(1));
+    Assert.assertTrue(eventDetailsCaptor.getAllValues().get(1).toString(), eventDetailsCaptor.getAllValues().get(0)
+            .toString().contains("{\"dbName\":\"srcDB\",\"dumpStartTime"));
+    Assert.assertTrue(eventDetailsCaptor
+            .getAllValues().get(1).toString().contains("{\"dbName\":\"srcDB\",\"dumpEndTime\""));
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java
new file mode 100644
index 0000000..bb5fe0b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+
+/**
+ * Unit test class for testing Atlas metadata load.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestAtlasLoadTask {
+  @Mock
+  private AtlasLoadTask atlasLoadTask;
+
+  @Mock
+  private HiveConf conf;
+
+  @Test
+  public void testAtlasLoadMetrics() throws Exception {
+    AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB",
+            "tgtDB", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf);
+    atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020");
+    atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020");
+    Mockito.when(atlasLoadTask.createAtlasReplInfo()).thenReturn(atlasReplInfo);
+    Mockito.when(atlasLoadTask.importAtlasMetadata(atlasReplInfo)).thenReturn(1);
+    Logger logger = Mockito.mock(Logger.class);
+    Whitebox.setInternalState(ReplState.class, logger);
+    Mockito.when(atlasLoadTask.execute()).thenCallRealMethod();
+    int status = atlasLoadTask.execute();
+    Assert.assertEquals(0, status);
+    ArgumentCaptor<String> replStateCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<Object> eventCaptor = ArgumentCaptor.forClass(Object.class);
+    ArgumentCaptor<Object> eventDetailsCaptor = ArgumentCaptor.forClass(Object.class);
+    Mockito.verify(logger,
+            Mockito.times(2)).info(replStateCaptor.capture(),
+            eventCaptor.capture(), eventDetailsCaptor.capture());
+    Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0));
+    Assert.assertEquals("ATLAS_LOAD_START", eventCaptor.getAllValues().get(0));
+    Assert.assertEquals("ATLAS_LOAD_END", eventCaptor.getAllValues().get(1));
+    Assert.assertTrue(eventDetailsCaptor.getAllValues().get(0)
+            .toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\":\"tgtDB\",\"loadStartTime\":"));
+    Assert.assertTrue(eventDetailsCaptor
+            .getAllValues().get(1).toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\""
+                    + ":\"tgtDB\",\"numOfEntities\":1,\"loadEndTime\""));
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
index 0559d1b..af41e3d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
@@ -44,7 +44,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_HIVE_SER
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_REST_URL;
 
 /**
- * Unit test class for testing Ranger Dump.
+ * Unit test class for testing Ranger Load.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class TestRangerLoadTask {