You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/05/03 23:52:19 UTC

kudu git commit: mapreduce: add support for fault tolerant scanner

Repository: kudu
Updated Branches:
  refs/heads/master 75333640b -> b797913a6


mapreduce: add support for fault tolerant scanner

This adds support to use fault tolerant scanner in mapreduce job.
By default non fault tolerant scanner is used. To turn on fault
tolerant scanner, use job config:
'kudu.mapreduce.input.fault.tolerant.scan'.

Change-Id: Ibc39472e2733bab4e00e73658f8a7619153bd7c6
Reviewed-on: http://gerrit.cloudera.org:8080/6745
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b797913a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b797913a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b797913a

Branch: refs/heads/master
Commit: b797913a60c21d75f2c816101a8ff7bb57137c39
Parents: 7533364
Author: hahao <ha...@cloudera.com>
Authored: Thu Apr 27 00:40:10 2017 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Wed May 3 23:52:00 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/mapreduce/KuduTableInputFormat.java   | 11 ++++++++++-
 .../apache/kudu/mapreduce/KuduTableMapReduceUtil.java | 14 +++++++++++++-
 .../org/apache/kudu/mapreduce/ITInputFormatJob.java   |  3 ++-
 3 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b797913a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
index 8f98170..6472430 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -88,6 +88,12 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
   /** Job parameter that specifies if the scanner should cache blocks or not (default: false). */
   static final String SCAN_CACHE_BLOCKS = "kudu.mapreduce.input.scan.cache.blocks";
 
+  /**
+   * Job parameter that specifies if the scanner should be fault tolerant
+   * or not (default: false).
+   */
+  static final String FAULT_TOLERANT_SCAN = "kudu.mapreduce.input.fault.tolerant.scan";
+
   /** Job parameter that specifies where the masters are. */
   static final String MASTER_ADDRESSES_KEY = "kudu.mapreduce.master.address";
 
@@ -123,6 +129,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
   private long operationTimeoutMs;
   private String nameServer;
   private boolean cacheBlocks;
+  private boolean isFaultTolerant;
   private List<String> projectedCols;
   private List<KuduPredicate> predicates;
 
@@ -137,7 +144,8 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table)
                                                       .setProjectedColumnNames(projectedCols)
                                                       .cacheBlocks(cacheBlocks)
-                                                      .setTimeout(operationTimeoutMs);
+                                                      .setTimeout(operationTimeoutMs)
+                                                      .setFaultTolerant(isFaultTolerant);
       for (KuduPredicate predicate : predicates) {
         tokenBuilder.addPredicate(predicate);
       }
@@ -214,6 +222,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client);
     this.nameServer = conf.get(NAME_SERVER_KEY);
     this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
+    this.isFaultTolerant = conf.getBoolean(FAULT_TOLERANT_SCAN, false);
 
     try {
       this.table = client.openTable(tableName);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b797913a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
index 6c8287e..cb6bc8d 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java
@@ -186,6 +186,7 @@ public class KuduTableMapReduceUtil {
     protected long operationTimeoutMs = AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
     protected final String columnProjection;
     protected boolean cacheBlocks;
+    protected boolean isFaultTolerant;
     protected List<KuduPredicate> predicates = new ArrayList<>();
 
     /**
@@ -202,7 +203,7 @@ public class KuduTableMapReduceUtil {
 
     /**
      * Sets the block caching configuration for the scanners. Turned off by default.
-     * @param cacheBlocks whether the job should use scanners that cache blocks.
+     * @param cacheBlocks whether the job should use scanners that cache blocks
      * @return this instance
      */
     public S cacheBlocks(boolean cacheBlocks) {
@@ -211,6 +212,16 @@ public class KuduTableMapReduceUtil {
     }
 
     /**
+     * Sets the fault tolerance configuration for the scanners. Turned off by default.
+     * @param isFaultTolerant whether the job should use fault tolerant scanners
+     * @return this instance
+     */
+    public S isFaultTolerant(boolean isFaultTolerant) {
+      this.isFaultTolerant = isFaultTolerant;
+      return (S) this;
+    }
+
+    /**
      * Configures the job with all the passed parameters.
      * @throws IOException If addDependencies is enabled and a problem is encountered reading
      * files on the filesystem
@@ -224,6 +235,7 @@ public class KuduTableMapReduceUtil {
       conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, table);
       conf.setLong(KuduTableInputFormat.OPERATION_TIMEOUT_MS_KEY, operationTimeoutMs);
       conf.setBoolean(KuduTableInputFormat.SCAN_CACHE_BLOCKS, cacheBlocks);
+      conf.setBoolean(KuduTableInputFormat.FAULT_TOLERANT_SCAN, isFaultTolerant);
 
       if (columnProjection != null) {
         conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b797913a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
index f4b1d4f..94230c6 100644
--- a/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
+++ b/java/kudu-mapreduce/src/test/java/org/apache/kudu/mapreduce/ITInputFormatJob.java
@@ -103,7 +103,8 @@ public class ITInputFormatJob extends BaseKuduTest {
             getMasterAddresses())
             .operationTimeoutMs(DEFAULT_SLEEP)
             .addDependencies(false)
-            .cacheBlocks(false);
+            .cacheBlocks(false)
+            .isFaultTolerant(false);
     for (KuduPredicate predicate : predicates) {
       configurator.addPredicate(predicate);
     }