You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/15 22:18:27 UTC

git commit: CRUNCH-263: Provide sensible defaults for the max split size of CrunchCombineFileInputFormat

Updated Branches:
  refs/heads/master fee4d1654 -> ac17f4f72


CRUNCH-263: Provide sensible defaults for the max split size of CrunchCombineFileInputFormat


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac17f4f7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac17f4f7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac17f4f7

Branch: refs/heads/master
Commit: ac17f4f72d2b26433ef79fec6463a7b122440ccc
Parents: fee4d16
Author: Josh Wills <jw...@apache.org>
Authored: Fri Sep 6 17:59:48 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Sep 15 13:17:07 2013 -0700

----------------------------------------------------------------------
 .../impl/mr/run/CrunchCombineFileInputFormat.java    | 15 +++++++++++----
 .../apache/crunch/impl/mr/run/CrunchInputFormat.java |  2 +-
 .../apache/crunch/impl/mr/run/RuntimeParameters.java |  2 ++
 3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
index 151d8b0..2413ccf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
@@ -17,20 +17,27 @@
  */
 package org.apache.crunch.impl.mr.run;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import java.io.IOException;
 
 public class CrunchCombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
-  private FileInputFormat<K, V> inputFormat;
 
-  public CrunchCombineFileInputFormat(FileInputFormat<K, V> inputFormat) {
-    this.inputFormat = inputFormat;
+  public CrunchCombineFileInputFormat(JobContext jobContext) {
+    if (getMaxSplitSize(jobContext) == Long.MAX_VALUE) {
+      Configuration conf = jobContext.getConfiguration();
+      if (conf.get(RuntimeParameters.COMBINE_FILE_BLOCK_SIZE) != null) {
+        setMaxSplitSize(conf.getLong(RuntimeParameters.COMBINE_FILE_BLOCK_SIZE, 0));
+      } else {
+        setMaxSplitSize(jobContext.getConfiguration().getLong("dfs.block.size", 134217728L));
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index fa4602a..0c6f5e1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -53,7 +53,7 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
       InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
           jobCopy.getConfiguration());
       if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) {
-        format = new CrunchCombineFileInputFormat<Object, Object>((FileInputFormat) format);
+        format = new CrunchCombineFileInputFormat<Object, Object>(job);
       }
       for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
         Integer nodeIndex = nodeEntry.getKey();

http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 8912897..7dc8521 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -30,6 +30,8 @@ public class RuntimeParameters {
 
   public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file";
 
+  public static final String COMBINE_FILE_BLOCK_SIZE = "crunch.combine.file.block.size";
+
   public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
 
   // Not instantiated