You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/02 18:11:14 UTC

git commit: CRUNCH-165: The latest attempt at using CombineFileInputFormat wherever possible.

Updated Branches:
  refs/heads/master ae748c8ea -> b7781ca08


CRUNCH-165: The latest attempt at using CombineFileInputFormat wherever possible.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b7781ca0
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b7781ca0
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b7781ca0

Branch: refs/heads/master
Commit: b7781ca0858b6eee0ff7c7c2ccbe0c49f247db5d
Parents: ae748c8
Author: Josh Wills <jw...@apache.org>
Authored: Fri Aug 9 16:30:37 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 2 08:41:45 2013 -0700

----------------------------------------------------------------------
 .../contrib/io/jdbc/DataBaseSourceIT.java       |  41 -------
 .../crunch/contrib/io/jdbc/DataBaseSource.java  |  15 ++-
 .../contrib/io/jdbc/IdentifiableName.java       |  56 ++++++++++
 .../org/apache/crunch/io/CombineFileIT.java     |  52 +++++++++
 .../apache/crunch/io/CombineFileITData/src1.txt |   4 +
 .../apache/crunch/io/CombineFileITData/src2.txt |   4 +
 .../mr/run/CrunchCombineFileInputFormat.java    |  37 +++++++
 .../crunch/impl/mr/run/CrunchInputFormat.java   |   3 +
 .../crunch/impl/mr/run/CrunchRecordReader.java  | 111 ++++++++++++++++---
 .../crunch/impl/mr/run/RuntimeParameters.java   |   4 +-
 .../apache/crunch/io/impl/FileSourceImpl.java   |  19 ++--
 .../apache/crunch/io/text/NLineFileSource.java  |   3 +
 .../crunch/io/avro/AvroFileSourceTest.java      |  12 +-
 13 files changed, 288 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
index 8fdb22d..1c48559 100644
--- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
@@ -19,28 +19,17 @@ package org.apache.crunch.contrib.io.jdbc;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.File;
-import java.io.IOException;
 import java.io.Serializable;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
-import org.apache.crunch.contrib.io.jdbc.DataBaseSource;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.CrunchTestSupport;
 import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.h2.tools.RunScript;
 import org.h2.tools.Server;
 import org.junit.After;
@@ -65,36 +54,6 @@ public class DataBaseSourceIT extends CrunchTestSupport implements Serializable
     server.stop();
   }
 
-  public static class IdentifiableName implements DBWritable, Writable {
-
-    public IntWritable id = new IntWritable();
-    public Text name = new Text();
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      id.readFields(in);
-      name.readFields(in);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      id.write(out);
-      name.write(out);
-    }
-
-    @Override
-    public void readFields(ResultSet resultSet) throws SQLException {
-      id.set(resultSet.getInt(1));
-      name.set(resultSet.getString(2));
-    }
-
-    @Override
-    public void write(PreparedStatement preparedStatement) throws SQLException {
-      throw new UnsupportedOperationException("Not implemented");
-    }
-
-  }
-
   @Test
   public void testReadFromSource() throws Exception {
     Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class);

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 83f509f..337ecb7 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -21,10 +21,13 @@ import java.io.IOException;
 import java.sql.Driver;
 
 import org.apache.crunch.Source;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -104,8 +107,16 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T
   public void configureSource(Job job, int inputId) throws IOException {
     Configuration configuration = job.getConfiguration();
     DBConfiguration.configureDB(configuration, driverClass, url, username, password);
-    job.setInputFormatClass(DBInputFormat.class);
-    DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+    if (inputId == -1) {
+      job.setInputFormatClass(DBInputFormat.class);
+      DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+    } else {
+      FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class)
+          .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
+          .set(DBConfiguration.INPUT_QUERY, selectClause)
+          .set(DBConfiguration.INPUT_COUNT_QUERY, countClause);
+      CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
new file mode 100644
index 0000000..c8a452a
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class IdentifiableName implements DBWritable, Writable {
+
+  public IntWritable id = new IntWritable();
+  public Text name = new Text();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id.readFields(in);
+    name.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    name.write(out);
+  }
+
+  @Override
+  public void readFields(ResultSet resultSet) throws SQLException {
+    id.set(resultSet.getInt(1));
+    name.set(resultSet.getString(2));
+  }
+
+  @Override
+  public void write(PreparedStatement preparedStatement) throws SQLException {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
new file mode 100644
index 0000000..efcec0c
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.io;
+
+import com.google.common.io.Files;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CombineFileIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testCombine() throws Exception {
+    File srcFiles = tmpDir.getFile("srcs");
+    File outputFiles = tmpDir.getFile("out");
+    assertTrue(srcFiles.mkdir());
+    File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt"));
+    File src2 = tmpDir.copyResourceFile(Tests.resource(this, "src2.txt"));
+    Files.copy(src1, new File(srcFiles, "src1.txt"));
+    Files.copy(src2, new File(srcFiles, "src2.txt"));
+
+    MRPipeline p = new MRPipeline(CombineFileIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath());
+    in.write(To.textFile(outputFiles.getAbsolutePath()));
+    p.done();
+    assertEquals(4, outputFiles.listFiles().length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
new file mode 100644
index 0000000..9f38eb9
--- /dev/null
+++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
@@ -0,0 +1,4 @@
+a,1-1
+b,1-2
+c,1-3
+a,1-4

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
new file mode 100644
index 0000000..ed9524e
--- /dev/null
+++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
@@ -0,0 +1,4 @@
+b,2-1
+c,2-2
+c,2-3
+d,2-4

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
new file mode 100644
index 0000000..27eaf94
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+
+public class CrunchCombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
+  private FileInputFormat<K, V> inputFormat;
+
+  public CrunchCombineFileInputFormat(FileInputFormat<K, V> inputFormat) {
+    this.inputFormat = inputFormat;
+  }
+
+  @Override
+  public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
+    throw new UnsupportedOperationException("CrunchCombineFileInputFormat.createRecordReader should never be called");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index cf3df81..fa4602a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -52,6 +52,9 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
       Job jobCopy = new Job(conf);
       InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
           jobCopy.getConfiguration());
+      if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) {
+        format = new CrunchCombineFileInputFormat<Object, Object>((FileInputFormat) format);
+      }
       for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
         Integer nodeIndex = nodeEntry.getKey();
         List<Path> paths = nodeEntry.getValue();

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index e5cbd95..32b3f74 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -25,61 +25,144 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
 class CrunchRecordReader<K, V> extends RecordReader<K, V> {
 
-  private final RecordReader<K, V> delegate;
+  private RecordReader<K, V> curReader;
+  private CrunchInputSplit crunchSplit;
+  private CombineFileSplit combineFileSplit;
+  private TaskAttemptContext context;
+  private int idx;
+  private long progress;
 
   public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
       InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    this.crunchSplit = (CrunchInputSplit) inputSplit;
+    if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
+      combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+    }
+    this.context = context;
     Configuration conf = crunchSplit.getConf();
     if (conf == null) {
       conf = context.getConfiguration();
       crunchSplit.setConf(conf);
     }
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
+    initNextRecordReader();
+  }
+
+  private boolean initNextRecordReader() throws IOException, InterruptedException {
+    if (combineFileSplit != null) {
+      if (curReader != null) {
+        curReader.close();
+        curReader = null;
+        if (idx > 0) {
+          progress += combineFileSplit.getLength(idx - 1);
+        }
+      }
+      // if all chunks have been processed, nothing more to do.
+      if (idx == combineFileSplit.getNumPaths()) {
+        return false;
+      }
+    } else if (idx > 0) {
+      return false;
+    }
+
+    idx++;
+    Configuration conf = crunchSplit.getConf();
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
+        crunchSplit.getInputFormatClass(),
         conf);
-    this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
+    this.curReader = inputFormat.createRecordReader(getDelegateSplit(),
         TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
+    return true;
+  }
+
+  private InputSplit getDelegateSplit() throws IOException {
+    if (combineFileSplit != null) {
+      return new FileSplit(combineFileSplit.getPath(idx - 1),
+          combineFileSplit.getOffset(idx - 1),
+          combineFileSplit.getLength(idx - 1),
+          combineFileSplit.getLocations());
+    } else {
+      return crunchSplit.getInputSplit();
+    }
   }
 
   @Override
   public void close() throws IOException {
-    delegate.close();
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
   }
 
   @Override
   public K getCurrentKey() throws IOException, InterruptedException {
-    return delegate.getCurrentKey();
+    return curReader.getCurrentKey();
   }
 
   @Override
   public V getCurrentValue() throws IOException, InterruptedException {
-    return delegate.getCurrentValue();
+    return curReader.getCurrentValue();
   }
 
   @Override
   public float getProgress() throws IOException, InterruptedException {
-    return delegate.getProgress();
+    float curProgress = 0;    // bytes processed in current split
+    if (null != curReader) {
+      curProgress = (float)(curReader.getProgress() * getCurLength());
+    }
+    return Math.min(1.0f,  (progress + curProgress)/getOverallLength());
+  }
+
+  private long getCurLength() {
+    if (combineFileSplit == null) {
+      return 1L;
+    } else {
+      return combineFileSplit.getLength(idx - 1);
+    }
+  }
+
+  private float getOverallLength() {
+    if (combineFileSplit == null) {
+      return 1.0f;
+    } else {
+      return (float) combineFileSplit.getLength();
+    }
   }
 
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    this.crunchSplit = (CrunchInputSplit) inputSplit;
+    this.context = context;
     Configuration conf = crunchSplit.getConf();
     if (conf == null) {
       conf = context.getConfiguration();
+      crunchSplit.setConf(conf);
+    }
+    if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
+      combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+    }
+    if (curReader != null) {
+      curReader.initialize(getDelegateSplit(),
+          TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
     }
-    InputSplit delegateSplit = crunchSplit.getInputSplit();
-    delegate.initialize(delegateSplit,
-        TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
   }
 
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
-    return delegate.nextKeyValue();
+    while ((curReader == null) || !curReader.nextKeyValue()) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+      if (curReader != null) {
+        curReader.initialize(getDelegateSplit(),
+            TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+      }
+    }
+    return true;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 604c49c..8912897 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -22,14 +22,14 @@ package org.apache.crunch.impl.mr.run;
  */
 public class RuntimeParameters {
 
-  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-
   public static final String DEBUG = "crunch.debug";
 
   public static final String TMP_DIR = "crunch.tmp.dir";
 
   public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
 
+  public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file";
+
   public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
 
   // Not instantiated

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 13645ba..a3cbdc8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.io.CompositePathIterable;
 import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FileReaderFactory;
@@ -91,19 +92,17 @@ public class FileSourceImpl<T> implements Source<T> {
   
   @Override
   public void configureSource(Job job, int inputId) throws IOException {
-    if (inputId == -1) {
-      for (Path path : paths) {
-        FileInputFormat.addInputPath(job, path);
-      }
-      job.setInputFormatClass(inputBundle.getFormatClass());
-      inputBundle.configure(job.getConfiguration());
-    } else {
-      for (Path path : paths) {
-        CrunchInputs.addInputPath(job, path, inputBundle, inputId);
-      }
+    // Use Crunch to handle the combined input splits
+    job.setInputFormatClass(CrunchInputFormat.class);
+    for (Path path : paths) {
+      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
     }
   }
 
+  public FormatBundle<? extends InputFormat> getBundle() {
+    return inputBundle;
+  }
+
   @Override
   public PType<T> getType() {
     return ptype;

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index abef771..0756b70 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -20,6 +20,8 @@ package org.apache.crunch.io.text;
 import java.io.IOException;
 
 import java.util.List;
+
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
@@ -39,6 +41,7 @@ public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSou
   private static FormatBundle getBundle(int linesPerTask) {
     FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class);
     bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
+    bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
     return bundle;
   }
   

http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
index ceef2b2..a8ee398 100644
--- a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.types.avro.AvroType;
@@ -58,7 +60,8 @@ public class AvroFileSourceTest {
     AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()),
         avroSpecificType);
 
-    personFileSource.configureSource(job, -1);
+    FormatBundle bundle = personFileSource.getBundle();
+    bundle.configure(job.getConfiguration());
 
     assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
     assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
@@ -70,7 +73,8 @@ public class AvroFileSourceTest {
     AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()),
         avroGenericType);
 
-    personFileSource.configureSource(job, -1);
+    FormatBundle bundle = personFileSource.getBundle();
+    bundle.configure(job.getConfiguration());
 
     assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
 
@@ -82,8 +86,8 @@ public class AvroFileSourceTest {
     AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path(
         tempFile.getAbsolutePath()), avroReflectType);
 
-    personFileSource.configureSource(job, -1);
-
+    FormatBundle bundle = personFileSource.getBundle();
+    bundle.configure(job.getConfiguration());
     assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false));
 
   }