You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/11/26 12:44:11 UTC
falcon git commit: FALCON-1480 Gather data transfer details of Hive
DR. Contributed by Peeyush Bishnoi.
Repository: falcon
Updated Branches:
refs/heads/master 2bf90130d -> baab41425
FALCON-1480 Gather data transfer details of Hive DR. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/baab4142
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/baab4142
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/baab4142
Branch: refs/heads/master
Commit: baab41425fbdf2e7e6dc4d55195e826d540b444c
Parents: 2bf9013
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Nov 26 15:27:54 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Nov 26 17:12:13 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 4 ++
addons/hivedr/pom.xml | 5 +-
.../java/org/apache/falcon/hive/HiveDRArgs.java | 3 +-
.../java/org/apache/falcon/hive/HiveDRTool.java | 18 +++++-
.../falcon/hive/mapreduce/CopyMapper.java | 11 ++++
.../org/apache/falcon/hive/util/EventUtils.java | 19 +++++-
.../apache/falcon/hive/util/HiveDRUtils.java | 13 ++++
.../hive-disaster-recovery-secure-workflow.xml | 2 +
.../hive-disaster-recovery-workflow.xml | 2 +
.../falcon/job/HiveReplicationCounters.java | 62 ++++++++++++++++++++
.../apache/falcon/job/JobCountersHandler.java | 2 +
.../java/org/apache/falcon/job/JobType.java | 3 +-
12 files changed, 139 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69aa1f4..a2ca7f9 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,11 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1480 Gather data transfer details of Hive DR. (Peeyush Bishnoi via Ajay Yadava)
+
FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao)
+
+ FALCON-1480 Gather data transfer details of Hive DR(Peeyush Bishnoi via Ajay Yadava)
FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
index f98e8c4..346ffdb 100644
--- a/addons/hivedr/pom.xml
+++ b/addons/hivedr/pom.xml
@@ -99,7 +99,10 @@
<artifactId>falcon-hadoop-dependencies</artifactId>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-metrics</artifactId>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 574524d..5490232 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -78,7 +78,8 @@ public enum HiveDRArgs {
FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
KEEP_HISTORY("keepHistory", "Keep history of events file generated", false),
- EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false);
+ EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false),
+ COUNTER_LOGDIR("counterLogDir", "Log directory to store counter file", false);
private final String name;
private final String description;
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index df16c40..e141800 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -31,6 +31,9 @@ import org.apache.falcon.hive.util.FileUtils;
import org.apache.falcon.hive.util.HiveDRStatusStore;
import org.apache.falcon.hive.util.HiveDRUtils;
import org.apache.falcon.hive.util.HiveMetastoreUtils;
+import org.apache.falcon.job.JobCounters;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -40,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -99,7 +103,19 @@ public class HiveDRTool extends Configured implements Tool {
}
try {
- execute();
+ Job job = execute();
+ if ((job != null) && (inputOptions.getExecutionStage().equalsIgnoreCase(
+ HiveDRUtils.ExecutionStage.EXPORT.name()))) {
+ if ((job.getStatus().getState() == JobStatus.State.SUCCEEDED)
+ && (job.getConfiguration().get("counterLogDir") != null)) {
+ LOG.info("Obtaining job replication counters for Hive DR job");
+ Path counterFile = new Path(job.getConfiguration().get("counterLogDir"), "counter.txt");
+ JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType(
+ JobType.HIVEREPLICATION.name());
+ hiveReplicationCounters.obtainJobCounters(job.getConfiguration(), job, false);
+ hiveReplicationCounters.storeJobCounters(job.getConfiguration(), counterFile);
+ }
+ }
} catch (Exception e) {
System.err.println("Exception encountered " + e.getMessage());
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index 5eb8acb..08e0551 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -22,6 +22,7 @@ import org.apache.falcon.hive.HiveDRArgs;
import org.apache.falcon.hive.util.EventUtils;
import org.apache.falcon.hive.util.HiveDRUtils;
import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.falcon.job.ReplicationJobCountersList;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
@@ -70,6 +71,16 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
context.write(new Text(rs.getJobName()), new Text(rs.toString()));
}
}
+
+ // In case of export stage, populate custom counters
+ if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
+ .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())
+ && !eventUtils.isCountersMapEmtpy()) {
+ context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment(
+ eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName()));
+ context.getCounter(ReplicationJobCountersList.COPY).increment(
+ eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName()));
+ }
}
protected void cleanup(Context context) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index f8397ff..d075bfb 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -27,11 +27,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hive.hcatalog.api.repl.Command;
import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,9 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
/**
@@ -80,6 +83,8 @@ public class EventUtils {
private Statement sourceStatement = null;
private Statement targetStatement = null;
+ private Map<String, Long> countersMap = null;
+
private List<ReplicationStatus> listReplicationStatus;
public EventUtils(Configuration conf) {
@@ -97,6 +102,7 @@ public class EventUtils {
targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
sourceCleanUpList = new ArrayList<Path>();
targetCleanUpList = new ArrayList<Path>();
+ countersMap = new HashMap<>();
}
public void setupConnection() throws Exception {
@@ -312,6 +318,9 @@ public class EventUtils {
Job distcpJob = distCp.execute();
LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
LOG.info("Completed DistCp");
+ if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) {
+ countersMap = HiveDRUtils.fetchReplicationCounters(conf, distcpJob);
+ }
}
public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
@@ -338,6 +347,14 @@ public class EventUtils {
return distcpOptions;
}
+ public Long getCounterValue(String counterKey) {
+ return countersMap.get(counterKey);
+ }
+
+ public boolean isCountersMapEmtpy() {
+ return countersMap.size() == 0 ? true : false;
+ }
+
public void cleanEventsDirectory() throws IOException {
LOG.info("Cleaning staging directory");
cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
index d9d6ab0..dff0803 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
@@ -18,13 +18,18 @@
package org.apache.falcon.hive.util;
+import org.apache.falcon.job.JobCounters;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Shell;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* Hive replication utility class.
@@ -83,4 +88,12 @@ public final class HiveDRUtils {
}
return path;
}
+
+ public static Map<String, Long> fetchReplicationCounters(Configuration conf,
+ Job job) throws IOException, InterruptedException {
+ JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType(
+ JobType.HIVEREPLICATION.name());
+ hiveReplicationCounters.obtainJobCounters(conf, job, true);
+ return hiveReplicationCounters.getCountersMap();
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
index 74902b4..0494cf6 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
@@ -244,6 +244,8 @@
<arg>${drJobName}-${nominalTime}</arg>
<arg>-executionStage</arg>
<arg>export</arg>
+ <arg>-counterLogDir</arg>
+ <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
</java>
<ok to="import-dr-replication"/>
<error to="fail"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
index 72d40a3..296e049 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
@@ -158,6 +158,8 @@
<arg>${drJobName}-${nominalTime}</arg>
<arg>-executionStage</arg>
<arg>export</arg>
+ <arg>-counterLogDir</arg>
+ <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
</java>
<ok to="import-dr-replication"/>
<error to="fail"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
new file mode 100644
index 0000000..847ac34
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Obtain and store Hive Replication counters.
+ */
+public class HiveReplicationCounters extends JobCounters {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveReplicationCounters.class);
+
+ public HiveReplicationCounters() {
+ super();
+ }
+
+
+ protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException {
+ if (isDistCp) {
+ populateReplicationCountersMap(jobCounters);
+ } else {
+ populateCustomCountersMap(jobCounters);
+ }
+ }
+
+ private void populateCustomCountersMap(Counters jobCounters) {
+ for (ReplicationJobCountersList counterVal : ReplicationJobCountersList.values()) {
+ if (counterVal == ReplicationJobCountersList.TIMETAKEN) {
+ continue;
+ }
+
+ Counter counter = jobCounters.findCounter(counterVal);
+ if (counter != null) {
+ String counterName = counter.getName();
+ long counterValue = counter.getValue();
+ countersMap.put(counterName, counterValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
index e8b68ff..391c4a2 100644
--- a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
@@ -32,6 +32,8 @@ public final class JobCountersHandler {
public static JobCounters getCountersType(String jobType) {
if (jobType.equals(JobType.FSREPLICATION.name())) {
return new FSReplicationCounters();
+ } else if (jobType.equals(JobType.HIVEREPLICATION.name())) {
+ return new HiveReplicationCounters();
}
LOG.error("JobType is not supported:" + jobType);
http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobType.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java
index 456e57f..ba45f76 100644
--- a/metrics/src/main/java/org/apache/falcon/job/JobType.java
+++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java
@@ -22,5 +22,6 @@ package org.apache.falcon.job;
* Types of the job for which counters need to obtain.
*/
public enum JobType {
- FSREPLICATION
+ FSREPLICATION,
+ HIVEREPLICATION
}