You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/02 18:11:14 UTC
git commit: CRUNCH-165: The latest attempt at using
CombineFileInputFormat wherever possible.
Updated Branches:
refs/heads/master ae748c8ea -> b7781ca08
CRUNCH-165: The latest attempt at using CombineFileInputFormat wherever possible.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b7781ca0
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b7781ca0
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b7781ca0
Branch: refs/heads/master
Commit: b7781ca0858b6eee0ff7c7c2ccbe0c49f247db5d
Parents: ae748c8
Author: Josh Wills <jw...@apache.org>
Authored: Fri Aug 9 16:30:37 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 2 08:41:45 2013 -0700
----------------------------------------------------------------------
.../contrib/io/jdbc/DataBaseSourceIT.java | 41 -------
.../crunch/contrib/io/jdbc/DataBaseSource.java | 15 ++-
.../contrib/io/jdbc/IdentifiableName.java | 56 ++++++++++
.../org/apache/crunch/io/CombineFileIT.java | 52 +++++++++
.../apache/crunch/io/CombineFileITData/src1.txt | 4 +
.../apache/crunch/io/CombineFileITData/src2.txt | 4 +
.../mr/run/CrunchCombineFileInputFormat.java | 37 +++++++
.../crunch/impl/mr/run/CrunchInputFormat.java | 3 +
.../crunch/impl/mr/run/CrunchRecordReader.java | 111 ++++++++++++++++---
.../crunch/impl/mr/run/RuntimeParameters.java | 4 +-
.../apache/crunch/io/impl/FileSourceImpl.java | 19 ++--
.../apache/crunch/io/text/NLineFileSource.java | 3 +
.../crunch/io/avro/AvroFileSourceTest.java | 12 +-
13 files changed, 288 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
index 8fdb22d..1c48559 100644
--- a/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
+++ b/crunch-contrib/src/it/java/org/apache/crunch/contrib/io/jdbc/DataBaseSourceIT.java
@@ -19,28 +19,17 @@ package org.apache.crunch.contrib.io.jdbc;
import static org.junit.Assert.assertEquals;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.File;
-import java.io.IOException;
import java.io.Serializable;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.List;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
-import org.apache.crunch.contrib.io.jdbc.DataBaseSource;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.CrunchTestSupport;
import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.h2.tools.RunScript;
import org.h2.tools.Server;
import org.junit.After;
@@ -65,36 +54,6 @@ public class DataBaseSourceIT extends CrunchTestSupport implements Serializable
server.stop();
}
- public static class IdentifiableName implements DBWritable, Writable {
-
- public IntWritable id = new IntWritable();
- public Text name = new Text();
-
- @Override
- public void readFields(DataInput in) throws IOException {
- id.readFields(in);
- name.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- id.write(out);
- name.write(out);
- }
-
- @Override
- public void readFields(ResultSet resultSet) throws SQLException {
- id.set(resultSet.getInt(1));
- name.set(resultSet.getString(2));
- }
-
- @Override
- public void write(PreparedStatement preparedStatement) throws SQLException {
- throw new UnsupportedOperationException("Not implemented");
- }
-
- }
-
@Test
public void testReadFromSource() throws Exception {
Pipeline pipeline = new MRPipeline(DataBaseSourceIT.class);
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 83f509f..337ecb7 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -21,10 +21,13 @@ import java.io.IOException;
import java.sql.Driver;
import org.apache.crunch.Source;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -104,8 +107,16 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T
public void configureSource(Job job, int inputId) throws IOException {
Configuration configuration = job.getConfiguration();
DBConfiguration.configureDB(configuration, driverClass, url, username, password);
- job.setInputFormatClass(DBInputFormat.class);
- DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+ if (inputId == -1) {
+ job.setInputFormatClass(DBInputFormat.class);
+ DBInputFormat.setInput(job, inputClass, selectClause, countClause);
+ } else {
+ FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class)
+ .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName())
+ .set(DBConfiguration.INPUT_QUERY, selectClause)
+ .set(DBConfiguration.INPUT_COUNT_QUERY, countClause);
+ CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
new file mode 100644
index 0000000..c8a452a
--- /dev/null
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/IdentifiableName.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.contrib.io.jdbc;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class IdentifiableName implements DBWritable, Writable {
+
+ public IntWritable id = new IntWritable();
+ public Text name = new Text();
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id.readFields(in);
+ name.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ id.write(out);
+ name.write(out);
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ id.set(resultSet.getInt(1));
+ name.set(resultSet.getString(2));
+ }
+
+ @Override
+ public void write(PreparedStatement preparedStatement) throws SQLException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
new file mode 100644
index 0000000..efcec0c
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.io;
+
+import com.google.common.io.Files;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CombineFileIT {
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testCombine() throws Exception {
+ File srcFiles = tmpDir.getFile("srcs");
+ File outputFiles = tmpDir.getFile("out");
+ assertTrue(srcFiles.mkdir());
+ File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt"));
+ File src2 = tmpDir.copyResourceFile(Tests.resource(this, "src2.txt"));
+ Files.copy(src1, new File(srcFiles, "src1.txt"));
+ Files.copy(src2, new File(srcFiles, "src2.txt"));
+
+ MRPipeline p = new MRPipeline(CombineFileIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath());
+ in.write(To.textFile(outputFiles.getAbsolutePath()));
+ p.done();
+ assertEquals(4, outputFiles.listFiles().length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
new file mode 100644
index 0000000..9f38eb9
--- /dev/null
+++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src1.txt
@@ -0,0 +1,4 @@
+a,1-1
+b,1-2
+c,1-3
+a,1-4
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
new file mode 100644
index 0000000..ed9524e
--- /dev/null
+++ b/crunch-core/src/it/resources/org/apache/crunch/io/CombineFileITData/src2.txt
@@ -0,0 +1,4 @@
+b,2-1
+c,2-2
+c,2-3
+d,2-4
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
new file mode 100644
index 0000000..27eaf94
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java
@@ -0,0 +1,37 @@
+/**
+ * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+
+public class CrunchCombineFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
+ private FileInputFormat<K, V> inputFormat;
+
+ public CrunchCombineFileInputFormat(FileInputFormat<K, V> inputFormat) {
+ this.inputFormat = inputFormat;
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
+ throw new UnsupportedOperationException("CrunchCombineFileInputFormat.createRecordReader should never be called");
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index cf3df81..fa4602a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -52,6 +52,9 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
Job jobCopy = new Job(conf);
InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
jobCopy.getConfiguration());
+ if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) {
+ format = new CrunchCombineFileInputFormat<Object, Object>((FileInputFormat) format);
+ }
for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
Integer nodeIndex = nodeEntry.getKey();
List<Path> paths = nodeEntry.getValue();
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index e5cbd95..32b3f74 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -25,61 +25,144 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
class CrunchRecordReader<K, V> extends RecordReader<K, V> {
- private final RecordReader<K, V> delegate;
+ private RecordReader<K, V> curReader;
+ private CrunchInputSplit crunchSplit;
+ private CombineFileSplit combineFileSplit;
+ private TaskAttemptContext context;
+ private int idx;
+ private long progress;
public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
InterruptedException {
- CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+ this.crunchSplit = (CrunchInputSplit) inputSplit;
+ if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
+ combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+ }
+ this.context = context;
Configuration conf = crunchSplit.getConf();
if (conf == null) {
conf = context.getConfiguration();
crunchSplit.setConf(conf);
}
- InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
+ initNextRecordReader();
+ }
+
+ private boolean initNextRecordReader() throws IOException, InterruptedException {
+ if (combineFileSplit != null) {
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ if (idx > 0) {
+ progress += combineFileSplit.getLength(idx - 1);
+ }
+ }
+ // if all chunks have been processed, nothing more to do.
+ if (idx == combineFileSplit.getNumPaths()) {
+ return false;
+ }
+ } else if (idx > 0) {
+ return false;
+ }
+
+ idx++;
+ Configuration conf = crunchSplit.getConf();
+ InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
+ crunchSplit.getInputFormatClass(),
conf);
- this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
+ this.curReader = inputFormat.createRecordReader(getDelegateSplit(),
TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
+ return true;
+ }
+
+ private InputSplit getDelegateSplit() throws IOException {
+ if (combineFileSplit != null) {
+ return new FileSplit(combineFileSplit.getPath(idx - 1),
+ combineFileSplit.getOffset(idx - 1),
+ combineFileSplit.getLength(idx - 1),
+ combineFileSplit.getLocations());
+ } else {
+ return crunchSplit.getInputSplit();
+ }
}
@Override
public void close() throws IOException {
- delegate.close();
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ }
}
@Override
public K getCurrentKey() throws IOException, InterruptedException {
- return delegate.getCurrentKey();
+ return curReader.getCurrentKey();
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
- return delegate.getCurrentValue();
+ return curReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
- return delegate.getProgress();
+ float curProgress = 0; // bytes processed in current split
+ if (null != curReader) {
+ curProgress = (float)(curReader.getProgress() * getCurLength());
+ }
+ return Math.min(1.0f, (progress + curProgress)/getOverallLength());
+ }
+
+ private long getCurLength() {
+ if (combineFileSplit == null) {
+ return 1L;
+ } else {
+ return combineFileSplit.getLength(idx - 1);
+ }
+ }
+
+ private float getOverallLength() {
+ if (combineFileSplit == null) {
+ return 1.0f;
+ } else {
+ return (float) combineFileSplit.getLength();
+ }
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
- CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+ this.crunchSplit = (CrunchInputSplit) inputSplit;
+ this.context = context;
Configuration conf = crunchSplit.getConf();
if (conf == null) {
conf = context.getConfiguration();
+ crunchSplit.setConf(conf);
+ }
+ if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
+ combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+ }
+ if (curReader != null) {
+ curReader.initialize(getDelegateSplit(),
+ TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
}
- InputSplit delegateSplit = crunchSplit.getInputSplit();
- delegate.initialize(delegateSplit,
- TaskAttemptContextFactory.create(conf, context.getTaskAttemptID()));
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- return delegate.nextKeyValue();
+ while ((curReader == null) || !curReader.nextKeyValue()) {
+ if (!initNextRecordReader()) {
+ return false;
+ }
+ if (curReader != null) {
+ curReader.initialize(getDelegateSplit(),
+ TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+ }
+ }
+ return true;
}
-
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 604c49c..8912897 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -22,14 +22,14 @@ package org.apache.crunch.impl.mr.run;
*/
public class RuntimeParameters {
- public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-
public static final String DEBUG = "crunch.debug";
public static final String TMP_DIR = "crunch.tmp.dir";
public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
+ public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file";
+
public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
// Not instantiated
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 13645ba..a3cbdc8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
import org.apache.crunch.io.CompositePathIterable;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FileReaderFactory;
@@ -91,19 +92,17 @@ public class FileSourceImpl<T> implements Source<T> {
@Override
public void configureSource(Job job, int inputId) throws IOException {
- if (inputId == -1) {
- for (Path path : paths) {
- FileInputFormat.addInputPath(job, path);
- }
- job.setInputFormatClass(inputBundle.getFormatClass());
- inputBundle.configure(job.getConfiguration());
- } else {
- for (Path path : paths) {
- CrunchInputs.addInputPath(job, path, inputBundle, inputId);
- }
+ // Use Crunch to handle the combined input splits
+ job.setInputFormatClass(CrunchInputFormat.class);
+ for (Path path : paths) {
+ CrunchInputs.addInputPath(job, path, inputBundle, inputId);
}
}
+ public FormatBundle<? extends InputFormat> getBundle() {
+ return inputBundle;
+ }
+
@Override
public PType<T> getType() {
return ptype;
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index abef771..0756b70 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -20,6 +20,8 @@ package org.apache.crunch.io.text;
import java.io.IOException;
import java.util.List;
+
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.impl.FileSourceImpl;
@@ -39,6 +41,7 @@ public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSou
private static FormatBundle getBundle(int linesPerTask) {
FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class);
bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
+ bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
return bundle;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b7781ca0/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
index ceef2b2..a8ee398 100644
--- a/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -26,6 +26,8 @@ import java.io.IOException;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.types.avro.AvroType;
@@ -58,7 +60,8 @@ public class AvroFileSourceTest {
AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()),
avroSpecificType);
- personFileSource.configureSource(job, -1);
+ FormatBundle bundle = personFileSource.getBundle();
+ bundle.configure(job.getConfiguration());
assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
assertEquals(Person.SCHEMA$.toString(), job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
@@ -70,7 +73,8 @@ public class AvroFileSourceTest {
AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()),
avroGenericType);
- personFileSource.configureSource(job, -1);
+ FormatBundle bundle = personFileSource.getBundle();
+ bundle.configure(job.getConfiguration());
assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true));
@@ -82,8 +86,8 @@ public class AvroFileSourceTest {
AvroFileSource<StringWrapper> personFileSource = new AvroFileSource<StringWrapper>(new Path(
tempFile.getAbsolutePath()), avroReflectType);
- personFileSource.configureSource(job, -1);
-
+ FormatBundle bundle = personFileSource.getBundle();
+ bundle.configure(job.getConfiguration());
assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, false));
}