You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/11/26 12:44:11 UTC

falcon git commit: FALCON-1480 Gather data transfer details of Hive DR. Contributed by Peeyush Bishnoi.

Repository: falcon
Updated Branches:
  refs/heads/master 2bf90130d -> baab41425


FALCON-1480 Gather data transfer details of Hive DR. Contributed by Peeyush Bishnoi.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/baab4142
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/baab4142
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/baab4142

Branch: refs/heads/master
Commit: baab41425fbdf2e7e6dc4d55195e826d540b444c
Parents: 2bf9013
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Nov 26 15:27:54 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Nov 26 17:12:13 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 addons/hivedr/pom.xml                           |  5 +-
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  3 +-
 .../java/org/apache/falcon/hive/HiveDRTool.java | 18 +++++-
 .../falcon/hive/mapreduce/CopyMapper.java       | 11 ++++
 .../org/apache/falcon/hive/util/EventUtils.java | 19 +++++-
 .../apache/falcon/hive/util/HiveDRUtils.java    | 13 ++++
 .../hive-disaster-recovery-secure-workflow.xml  |  2 +
 .../hive-disaster-recovery-workflow.xml         |  2 +
 .../falcon/job/HiveReplicationCounters.java     | 62 ++++++++++++++++++++
 .../apache/falcon/job/JobCountersHandler.java   |  2 +
 .../java/org/apache/falcon/job/JobType.java     |  3 +-
 12 files changed, 139 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69aa1f4..a2ca7f9 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,11 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1480 Gather data transfer details of Hive DR. (Peeyush Bishnoi via Ajay Yadava)
+    
     FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao)
+    
+    FALCON-1480 Gather data transfer details of Hive DR(Peeyush Bishnoi via Ajay Yadava)
 
     FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
index f98e8c4..346ffdb 100644
--- a/addons/hivedr/pom.xml
+++ b/addons/hivedr/pom.xml
@@ -99,7 +99,10 @@
             <artifactId>falcon-hadoop-dependencies</artifactId>
             <scope>test</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-metrics</artifactId>
+        </dependency>
     </dependencies>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 574524d..5490232 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -78,7 +78,8 @@ public enum HiveDRArgs {
     FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
 
     KEEP_HISTORY("keepHistory", "Keep history of events file generated", false),
-    EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false);
+    EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false),
+    COUNTER_LOGDIR("counterLogDir", "Log directory to store counter file", false);
 
     private final String name;
     private final String description;

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index df16c40..e141800 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -31,6 +31,9 @@ import org.apache.falcon.hive.util.FileUtils;
 import org.apache.falcon.hive.util.HiveDRStatusStore;
 import org.apache.falcon.hive.util.HiveDRUtils;
 import org.apache.falcon.hive.util.HiveMetastoreUtils;
+import org.apache.falcon.job.JobCounters;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,6 +43,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -99,7 +103,19 @@ public class HiveDRTool extends Configured implements Tool {
         }
 
         try {
-            execute();
+            Job job = execute();
+            if ((job != null) && (inputOptions.getExecutionStage().equalsIgnoreCase(
+                    HiveDRUtils.ExecutionStage.EXPORT.name()))) {
+                if ((job.getStatus().getState() == JobStatus.State.SUCCEEDED)
+                        && (job.getConfiguration().get("counterLogDir") != null)) {
+                    LOG.info("Obtaining job replication counters for Hive DR job");
+                    Path counterFile = new Path(job.getConfiguration().get("counterLogDir"), "counter.txt");
+                    JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType(
+                            JobType.HIVEREPLICATION.name());
+                    hiveReplicationCounters.obtainJobCounters(job.getConfiguration(), job, false);
+                    hiveReplicationCounters.storeJobCounters(job.getConfiguration(), counterFile);
+                }
+            }
         } catch (Exception e) {
             System.err.println("Exception encountered " + e.getMessage());
             e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index 5eb8acb..08e0551 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -22,6 +22,7 @@ import org.apache.falcon.hive.HiveDRArgs;
 import org.apache.falcon.hive.util.EventUtils;
 import org.apache.falcon.hive.util.HiveDRUtils;
 import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.falcon.job.ReplicationJobCountersList;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -70,6 +71,16 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
                 context.write(new Text(rs.getJobName()), new Text(rs.toString()));
             }
         }
+
+        // In case of export stage, populate custom counters
+        if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
+                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())
+                && !eventUtils.isCountersMapEmtpy()) {
+            context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment(
+                    eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName()));
+            context.getCounter(ReplicationJobCountersList.COPY).increment(
+                    eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName()));
+        }
     }
 
     protected void cleanup(Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index f8397ff..d075bfb 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -27,11 +27,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.apache.hive.hcatalog.api.repl.Command;
 import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +45,9 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -80,6 +83,8 @@ public class EventUtils {
     private Statement sourceStatement = null;
     private Statement targetStatement = null;
 
+    private Map<String, Long> countersMap = null;
+
     private List<ReplicationStatus> listReplicationStatus;
 
     public EventUtils(Configuration conf) {
@@ -97,6 +102,7 @@ public class EventUtils {
         targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
         sourceCleanUpList = new ArrayList<Path>();
         targetCleanUpList = new ArrayList<Path>();
+        countersMap = new HashMap<>();
     }
 
     public void setupConnection() throws Exception {
@@ -312,6 +318,9 @@ public class EventUtils {
         Job distcpJob = distCp.execute();
         LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
         LOG.info("Completed DistCp");
+        if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) {
+            countersMap = HiveDRUtils.fetchReplicationCounters(conf, distcpJob);
+        }
     }
 
     public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
@@ -338,6 +347,14 @@ public class EventUtils {
         return distcpOptions;
     }
 
+    public Long getCounterValue(String counterKey) {
+        return countersMap.get(counterKey);
+    }
+
+    public boolean isCountersMapEmtpy() {
+        return countersMap.size() == 0 ? true : false;
+    }
+
     public void cleanEventsDirectory() throws IOException {
         LOG.info("Cleaning staging directory");
         cleanupEventLocations(sourceCleanUpList, sourceFileSystem);

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
index d9d6ab0..dff0803 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
@@ -18,13 +18,18 @@
 
 package org.apache.falcon.hive.util;
 
+import org.apache.falcon.job.JobCounters;
+import org.apache.falcon.job.JobCountersHandler;
+import org.apache.falcon.job.JobType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Shell;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Hive replication utility class.
@@ -83,4 +88,12 @@ public final class HiveDRUtils {
         }
         return path;
     }
+
+    public static Map<String, Long> fetchReplicationCounters(Configuration conf,
+                                                             Job job) throws IOException, InterruptedException {
+        JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType(
+                JobType.HIVEREPLICATION.name());
+        hiveReplicationCounters.obtainJobCounters(conf, job, true);
+        return hiveReplicationCounters.getCountersMap();
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
index 74902b4..0494cf6 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
@@ -244,6 +244,8 @@
             <arg>${drJobName}-${nominalTime}</arg>
             <arg>-executionStage</arg>
             <arg>export</arg>
+            <arg>-counterLogDir</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
         </java>
         <ok to="import-dr-replication"/>
         <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
index 72d40a3..296e049 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
@@ -158,6 +158,8 @@
             <arg>${drJobName}-${nominalTime}</arg>
             <arg>-executionStage</arg>
             <arg>export</arg>
+            <arg>-counterLogDir</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
         </java>
         <ok to="import-dr-replication"/>
         <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
new file mode 100644
index 0000000..847ac34
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/job/HiveReplicationCounters.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.job;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Obtain and store Hive Replication counters.
+ */
+public class HiveReplicationCounters extends JobCounters {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveReplicationCounters.class);
+
+    public HiveReplicationCounters() {
+        super();
+    }
+
+
+    protected void parseJob(Job job, Counters jobCounters, boolean isDistCp) throws IOException {
+        if (isDistCp) {
+            populateReplicationCountersMap(jobCounters);
+        } else {
+            populateCustomCountersMap(jobCounters);
+        }
+    }
+
+    private void populateCustomCountersMap(Counters jobCounters) {
+        for (ReplicationJobCountersList counterVal : ReplicationJobCountersList.values()) {
+            if (counterVal == ReplicationJobCountersList.TIMETAKEN) {
+                continue;
+            }
+
+            Counter counter = jobCounters.findCounter(counterVal);
+            if (counter != null) {
+                String counterName = counter.getName();
+                long counterValue = counter.getValue();
+                countersMap.put(counterName, counterValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
index e8b68ff..391c4a2 100644
--- a/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
+++ b/metrics/src/main/java/org/apache/falcon/job/JobCountersHandler.java
@@ -32,6 +32,8 @@ public final class JobCountersHandler {
     public static JobCounters getCountersType(String jobType) {
         if (jobType.equals(JobType.FSREPLICATION.name())) {
             return new FSReplicationCounters();
+        } else if (jobType.equals(JobType.HIVEREPLICATION.name())) {
+            return new HiveReplicationCounters();
         }
 
         LOG.error("JobType is not supported:" + jobType);

http://git-wip-us.apache.org/repos/asf/falcon/blob/baab4142/metrics/src/main/java/org/apache/falcon/job/JobType.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/job/JobType.java b/metrics/src/main/java/org/apache/falcon/job/JobType.java
index 456e57f..ba45f76 100644
--- a/metrics/src/main/java/org/apache/falcon/job/JobType.java
+++ b/metrics/src/main/java/org/apache/falcon/job/JobType.java
@@ -22,5 +22,6 @@ package org.apache.falcon.job;
  * Types of the job for which counters need to obtain.
  */
 public enum JobType {
-    FSREPLICATION
+    FSREPLICATION,
+    HIVEREPLICATION
 }