You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/19 20:23:23 UTC

falcon git commit: FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed

Repository: falcon
Updated Branches:
  refs/heads/master 73339264d -> ec4a273a8


FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed

Author: Venkat Ranganathan <ve...@hortonworks.com>

Reviewers: "Praveen Adlakha <ad...@gmail.com>, Balu Vellanki <ba...@apache.org>"

Closes #230 from vrangan/FALCON-2075


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ec4a273a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ec4a273a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ec4a273a

Branch: refs/heads/master
Commit: ec4a273a8776ebdd1c50e062cad46c981b1d0122
Parents: 7333926
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Tue Jul 19 13:23:20 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue Jul 19 13:23:20 2016 -0700

----------------------------------------------------------------------
 .../apache/falcon/hive/mapreduce/CopyMapper.java    | 14 ++++++++++++--
 .../apache/falcon/hive/mapreduce/CopyReducer.java   | 16 +++++++++++++++-
 2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ec4a273a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index e2297ef..5cd7e74 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Map class for Hive DR.
@@ -40,6 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
 
     private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
     private EventUtils eventUtils;
+    ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -54,15 +57,22 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
 
     @Override
     protected void map(LongWritable key, Text value,
-                       Context context) throws IOException, InterruptedException {
+                       final Context context) throws IOException, InterruptedException {
         LOG.debug("Processing Event value: {}", value.toString());
-
+        timer = new ScheduledThreadPoolExecutor(1);
+        timer.scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                System.out.println("Hive DR copy mapper progress heart beat");
+                context.progress();
+            }
+        }, 0, 30, TimeUnit.SECONDS);
         try {
             eventUtils.processEvents(value.toString());
         } catch (Exception e) {
             LOG.error("Exception in processing events:", e);
             throw new IOException(e);
         } finally {
+            timer.shutdownNow();
             cleanup(context);
         }
         List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();

http://git-wip-us.apache.org/repos/asf/falcon/blob/ec4a273a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
index 50cb4b2..f4bb31c 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -35,12 +35,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Reducer class for Hive DR.
  */
 public class CopyReducer extends Reducer<Text, Text, Text, Text> {
     private DRStatusStore hiveDRStore;
+    private ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -62,9 +65,18 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
     }
 
     @Override
-    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    protected void reduce(Text key, Iterable<Text> values, final Context context)
+            throws IOException, InterruptedException {
         List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
         ReplicationStatus rs;
+        timer = new ScheduledThreadPoolExecutor(1);
+        timer.scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                System.out.println("Hive DR copy reducer progress heart beat");
+                context.progress();
+            }
+        }, 0, 30, TimeUnit.SECONDS);
+
         try {
             for (Text value : values) {
                 String[] fields = (value.toString()).split("\t");
@@ -76,6 +88,8 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
             hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList));
         } catch (HiveReplicationException e) {
             throw new IOException(e);
+        } finally {
+            timer.shutdownNow();
         }
     }