You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/06/02 06:30:46 UTC
[hive] branch master updated: HIVE-23514: Add Atlas metadata
replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)
This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 55ab407 HIVE-23514: Add Atlas metadata replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)
55ab407 is described below
commit 55ab407da05eed1991758ee9baa7cca9d1412ed1
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Tue Jun 2 12:00:33 2020 +0530
HIVE-23514: Add Atlas metadata replication metrics (Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 21 ++++--
.../hadoop/hive/ql/exec/repl/AtlasLoadTask.java | 10 ++-
.../hadoop/hive/ql/parse/repl/ReplState.java | 4 ++
.../ql/parse/repl/dump/log/AtlasDumpLogger.java | 47 ++++++++++++
.../parse/repl/dump/log/state/AtlasDumpBegin.java | 42 +++++++++++
.../ql/parse/repl/dump/log/state/AtlasDumpEnd.java | 47 ++++++++++++
.../ql/parse/repl/load/log/AtlasLoadLogger.java | 48 +++++++++++++
.../parse/repl/load/log/state/AtlasLoadBegin.java | 45 ++++++++++++
.../ql/parse/repl/load/log/state/AtlasLoadEnd.java | 58 +++++++++++++++
.../hive/ql/exec/repl/TestAtlasDumpTask.java | 83 ++++++++++++++++++++++
.../hive/ql/exec/repl/TestAtlasLoadTask.java | 73 +++++++++++++++++++
.../hive/ql/exec/repl/TestRangerLoadTask.java | 2 +-
12 files changed, 470 insertions(+), 10 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 26cdc6b..be48f99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,14 +68,19 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+ AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
+ atlasReplInfo.getStagingDir().toString());
+ replLogger.startLog();
atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
.getClient(atlasReplInfo.getConf());
AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(),
atlasReplInfo.getSrcDB());
long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid);
- dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+ long numBytesWritten = dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo);
+ LOG.debug("Finished dumping atlas metadata, total:{} bytes written", numBytesWritten);
createDumpMetadata(atlasReplInfo, currentModifiedTime);
+ replLogger.endLog(0L);
return 0;
} catch (Exception e) {
LOG.error("Exception while dumping atlas metadata", e);
@@ -83,7 +89,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
}
}
- private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+ public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
@@ -99,7 +105,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
return atlasReplInfo;
}
- private long lastStoredTimeStamp() throws SemanticException {
+ public long lastStoredTimeStamp() throws SemanticException {
Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME);
BufferedReader br = null;
try {
@@ -132,17 +138,17 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
return ret;
}
- private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
+ public long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo)
throws SemanticException {
InputStream inputStream = null;
+ long numBytesWritten = 0L;
try {
AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo,
atlasReplInfo.getSrcCluster());
inputStream = atlasRestClient.exportData(exportRequest);
FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf());
Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
- long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
- LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten);
+ numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
} catch (SemanticException ex) {
throw ex;
} catch (Exception ex) {
@@ -156,6 +162,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
}
}
}
+ return numBytesWritten;
}
private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName,
@@ -174,7 +181,7 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
return guid;
}
- private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
+ public void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException {
Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME);
List<List<String>> listValues = new ArrayList<>();
listValues.add(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index fceded5..fa18bf3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,11 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
+ AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
+ atlasReplInfo.getStagingDir().toString());
+ replLogger.startLog();
int importCount = importAtlasMetadata(atlasReplInfo);
+ replLogger.endLog(importCount);
LOG.info("Atlas entities import count {}", importCount);
return 0;
} catch (Exception e) {
@@ -66,7 +72,7 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
}
}
- private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
+ public AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
@@ -105,7 +111,7 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
}
}
- private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
+ public int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
index e441153..34fd2fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplState.java
@@ -46,11 +46,15 @@ public abstract class ReplState {
TABLE_DUMP,
FUNCTION_DUMP,
EVENT_DUMP,
+ ATLAS_DUMP_START,
+ ATLAS_DUMP_END,
RANGER_DUMP_START,
RANGER_DUMP_END,
TABLE_LOAD,
FUNCTION_LOAD,
EVENT_LOAD,
+ ATLAS_LOAD_START,
+ ATLAS_LOAD_END,
RANGER_LOAD_START,
RANGER_LOAD_END,
END
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java
new file mode 100644
index 0000000..840c51e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/AtlasDumpLogger.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log;
+
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpBegin;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.state.AtlasDumpEnd;
+
+/**
+ * AtlasDumpLogger.
+ * Repllogger for Atlas metadata dump.
+ **/
+public class AtlasDumpLogger extends ReplLogger<Long> {
+ private String dbName;
+ private String dumpDir;
+
+ public AtlasDumpLogger(String dbName, String dumpDir) {
+ this.dbName = dbName;
+ this.dumpDir = dumpDir;
+ }
+
+ @Override
+ public void startLog() {
+ new AtlasDumpBegin(dbName).log(LogTag.ATLAS_DUMP_START);
+ }
+
+ @Override
+ public void endLog(Long count) {
+ new AtlasDumpEnd(dbName, dumpDir).log(LogTag.ATLAS_DUMP_END);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java
new file mode 100644
index 0000000..02237c5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpBegin.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * AtlasDumpBegin.
+ *
+ * ReplState to define Atlas Dump Start.
+ **/
+public class AtlasDumpBegin extends ReplState {
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String dbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private Long dumpStartTime;
+
+ public AtlasDumpBegin(String dbName) {
+ this.dbName = dbName;
+ this.dumpStartTime = System.currentTimeMillis() / 1000;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java
new file mode 100644
index 0000000..074f94d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/log/state/AtlasDumpEnd.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * AtlasDumpEnd.
+ *
+ * ReplState to define Atlas Dump End.
+ **/
+public class AtlasDumpEnd extends ReplState {
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String dbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private Long dumpEndTime;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String dumpDir;
+
+ public AtlasDumpEnd(String dbName, String dumpDir) {
+ this.dbName = dbName;
+ this.dumpEndTime = System.currentTimeMillis() / 1000;
+ this.dumpDir = dumpDir;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java
new file mode 100644
index 0000000..603683d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/AtlasLoadLogger.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log;
+
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState.LogTag;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadBegin;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.state.AtlasLoadEnd;
+
+/**
+ * Repl logger for Atlas metadata load task.
+ **/
+public class AtlasLoadLogger extends ReplLogger<Integer> {
+ private String sourceDbName;
+ private String targetDbName;
+ private String dumpDir;
+
+ public AtlasLoadLogger(String sourceDbName, String targetDbName, String dumpDir) {
+ this.sourceDbName = sourceDbName;
+ this.targetDbName = targetDbName;
+ this.dumpDir = dumpDir;
+ }
+
+ @Override
+ public void startLog() {
+ new AtlasLoadBegin(sourceDbName, targetDbName).log(LogTag.ATLAS_LOAD_START);
+ }
+
+ @Override
+ public void endLog(Integer count) {
+ new AtlasLoadEnd(sourceDbName, targetDbName, count, dumpDir).log(LogTag.ATLAS_LOAD_END);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java
new file mode 100644
index 0000000..a29881d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadBegin.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Replication state for Atlas Load Begin.
+ **/
+public class AtlasLoadBegin extends ReplState {
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String sourceDbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String targetDbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private Long loadStartTime;
+
+ public AtlasLoadBegin(String sourceDbName, String targetDbName) {
+ this.sourceDbName = sourceDbName;
+ this.targetDbName = targetDbName;
+ this.loadStartTime = System.currentTimeMillis() / 1000;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java
new file mode 100644
index 0000000..c234af4
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/AtlasLoadEnd.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log.state;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Replication state for Atlas Load End.
+ **/
+public class AtlasLoadEnd extends ReplState {
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String sourceDbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String targetDbName;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private Long numOfEntities;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private Long loadEndTime;
+
+ @SuppressFBWarnings("URF_UNREAD_FIELD")
+ @JsonProperty
+ private String dumpDir;
+
+ public AtlasLoadEnd(String sourceDbName,
+ String targetDbName,
+ long numOfEntities,
+ String dumpDir) {
+ this.sourceDbName = sourceDbName;
+ this.targetDbName = targetDbName;
+ this.numOfEntities = numOfEntities;
+ this.loadEndTime = System.currentTimeMillis() / 1000;
+ this.dumpDir = dumpDir;
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
new file mode 100644
index 0000000..dee332f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.ArgumentMatchers.any;
+
+/**
+ * Unit test class for testing Atlas metadata Dump.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestAtlasDumpTask {
+
+ @Mock
+ private AtlasDumpTask atlasDumpTask;
+
+ @Mock
+ private HiveConf conf;
+
+ @Test
+ public void testAtlasDumpMetrics() throws Exception {
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB",
+ "tgtDb", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf);
+ atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020");
+ atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020");
+ Mockito.when(atlasDumpTask.createAtlasReplInfo()).thenReturn(atlasReplInfo);
+ Mockito.when(atlasDumpTask.lastStoredTimeStamp()).thenReturn(0L);
+ Mockito.when(atlasDumpTask.dumpAtlasMetaData(any(AtlasRequestBuilder.class), any(AtlasReplInfo.class)))
+ .thenReturn(0L);
+ Mockito.when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true);
+ Logger logger = Mockito.mock(Logger.class);
+ Whitebox.setInternalState(ReplState.class, logger);
+ Mockito.when(atlasDumpTask.execute()).thenCallRealMethod();
+ int status = atlasDumpTask.execute();
+ Assert.assertEquals(0, status);
+ ArgumentCaptor<String> replStateCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Object> eventCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Object> eventDetailsCaptor = ArgumentCaptor.forClass(Object.class);
+ Mockito.verify(logger,
+ Mockito.times(2)).info(replStateCaptor.capture(),
+ eventCaptor.capture(), eventDetailsCaptor.capture());
+ Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0));
+ Assert.assertEquals("ATLAS_DUMP_START", eventCaptor.getAllValues().get(0));
+ Assert.assertEquals("ATLAS_DUMP_END", eventCaptor.getAllValues().get(1));
+ Assert.assertTrue(eventDetailsCaptor.getAllValues().get(1).toString(), eventDetailsCaptor.getAllValues().get(0)
+ .toString().contains("{\"dbName\":\"srcDB\",\"dumpStartTime"));
+ Assert.assertTrue(eventDetailsCaptor
+ .getAllValues().get(1).toString().contains("{\"dbName\":\"srcDB\",\"dumpEndTime\""));
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java
new file mode 100644
index 0000000..bb5fe0b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+
+/**
+ * Unit test class for testing Atlas metadata load.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestAtlasLoadTask {
+ @Mock
+ private AtlasLoadTask atlasLoadTask;
+
+ @Mock
+ private HiveConf conf;
+
+ @Test
+ public void testAtlasLoadMetrics() throws Exception {
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB",
+ "tgtDB", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf);
+ atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020");
+ atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020");
+ Mockito.when(atlasLoadTask.createAtlasReplInfo()).thenReturn(atlasReplInfo);
+ Mockito.when(atlasLoadTask.importAtlasMetadata(atlasReplInfo)).thenReturn(1);
+ Logger logger = Mockito.mock(Logger.class);
+ Whitebox.setInternalState(ReplState.class, logger);
+ Mockito.when(atlasLoadTask.execute()).thenCallRealMethod();
+ int status = atlasLoadTask.execute();
+ Assert.assertEquals(0, status);
+ ArgumentCaptor<String> replStateCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Object> eventCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Object> eventDetailsCaptor = ArgumentCaptor.forClass(Object.class);
+ Mockito.verify(logger,
+ Mockito.times(2)).info(replStateCaptor.capture(),
+ eventCaptor.capture(), eventDetailsCaptor.capture());
+ Assert.assertEquals("REPL::{}: {}", replStateCaptor.getAllValues().get(0));
+ Assert.assertEquals("ATLAS_LOAD_START", eventCaptor.getAllValues().get(0));
+ Assert.assertEquals("ATLAS_LOAD_END", eventCaptor.getAllValues().get(1));
+ Assert.assertTrue(eventDetailsCaptor.getAllValues().get(0)
+ .toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\":\"tgtDB\",\"loadStartTime\":"));
+ Assert.assertTrue(eventDetailsCaptor
+ .getAllValues().get(1).toString().contains("{\"sourceDbName\":\"srcDB\",\"targetDbName\""
+ + ":\"tgtDB\",\"numOfEntities\":1,\"loadEndTime\""));
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
index 0559d1b..af41e3d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
@@ -44,7 +44,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_HIVE_SER
import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_REST_URL;
/**
- * Unit test class for testing Ranger Dump.
+ * Unit test class for testing Ranger Load.
*/
@RunWith(MockitoJUnitRunner.class)
public class TestRangerLoadTask {