You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/07/19 20:23:23 UTC
falcon git commit: FALCON-2075 Falcon HiveDR tasks do not report
progress and can get killed
Repository: falcon
Updated Branches:
refs/heads/master 73339264d -> ec4a273a8
FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed
Author: Venkat Ranganathan <ve...@hortonworks.com>
Reviewers: "Praveen Adlakha <ad...@gmail.com>, Balu Vellanki <ba...@apache.org>"
Closes #230 from vrangan/FALCON-2075
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ec4a273a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ec4a273a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ec4a273a
Branch: refs/heads/master
Commit: ec4a273a8776ebdd1c50e062cad46c981b1d0122
Parents: 7333926
Author: Venkat Ranganathan <ve...@hortonworks.com>
Authored: Tue Jul 19 13:23:20 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue Jul 19 13:23:20 2016 -0700
----------------------------------------------------------------------
.../apache/falcon/hive/mapreduce/CopyMapper.java | 14 ++++++++++++--
.../apache/falcon/hive/mapreduce/CopyReducer.java | 16 +++++++++++++++-
2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ec4a273a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index e2297ef..5cd7e74 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* Map class for Hive DR.
@@ -40,6 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
private EventUtils eventUtils;
+ ScheduledThreadPoolExecutor timer;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
@@ -54,15 +57,22 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value,
- Context context) throws IOException, InterruptedException {
+ final Context context) throws IOException, InterruptedException {
LOG.debug("Processing Event value: {}", value.toString());
-
+ timer = new ScheduledThreadPoolExecutor(1);
+ timer.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ System.out.println("Hive DR copy mapper progress heart beat");
+ context.progress();
+ }
+ }, 0, 30, TimeUnit.SECONDS);
try {
eventUtils.processEvents(value.toString());
} catch (Exception e) {
LOG.error("Exception in processing events:", e);
throw new IOException(e);
} finally {
+ timer.shutdownNow();
cleanup(context);
}
List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();
http://git-wip-us.apache.org/repos/asf/falcon/blob/ec4a273a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
index 50cb4b2..f4bb31c 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -35,12 +35,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Reducer class for Hive DR.
*/
public class CopyReducer extends Reducer<Text, Text, Text, Text> {
private DRStatusStore hiveDRStore;
+ private ScheduledThreadPoolExecutor timer;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
@@ -62,9 +65,18 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
}
@Override
- protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ protected void reduce(Text key, Iterable<Text> values, final Context context)
+ throws IOException, InterruptedException {
List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
ReplicationStatus rs;
+ timer = new ScheduledThreadPoolExecutor(1);
+ timer.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ System.out.println("Hive DR copy reducer progress heart beat");
+ context.progress();
+ }
+ }, 0, 30, TimeUnit.SECONDS);
+
try {
for (Text value : values) {
String[] fields = (value.toString()).split("\t");
@@ -76,6 +88,8 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList));
} catch (HiveReplicationException e) {
throw new IOException(e);
+ } finally {
+ timer.shutdownNow();
}
}