You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/05/29 05:16:19 UTC
git commit: CRUNCH-354: Have CrunchInputSplit implement
Supplier to provide access to the base split
Repository: crunch
Updated Branches:
refs/heads/master a62a24b6f -> dc03c9133
CRUNCH-354: Have CrunchInputSplit implement Supplier<InputSplit> to provide access to the base split
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dc03c913
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dc03c913
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dc03c913
Branch: refs/heads/master
Commit: dc03c913318d2853eed3751886fe45bdfdf7dfa4
Parents: a62a24b
Author: Josh Wills <jw...@apache.org>
Authored: Tue May 27 14:04:24 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Wed May 28 20:14:39 2014 -0700
----------------------------------------------------------------------
.../org/apache/crunch/impl/mr/run/CrunchInputSplit.java | 12 +++++++-----
.../apache/crunch/impl/mr/run/CrunchRecordReader.java | 10 +++++-----
2 files changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/dc03c913/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index 02942bc..ec255d9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
+import com.google.common.base.Supplier;
import org.apache.crunch.io.FormatBundle;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.util.ReflectionUtils;
-class CrunchInputSplit extends InputSplit implements Writable, Configurable {
+class CrunchInputSplit extends InputSplit implements Writable, Configurable, Supplier<InputSplit> {
private InputSplit inputSplit;
private int nodeIndex;
@@ -58,6 +59,11 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable {
}
@Override
+ public InputSplit get() {
+ return inputSplit;
+ }
+
+ @Override
public void setConf(Configuration conf) {
this.conf = new Configuration(conf);
if (bundle != null && conf != null) {
@@ -74,10 +80,6 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable {
return nodeIndex;
}
- public InputSplit getInputSplit() {
- return inputSplit;
- }
-
public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
return bundle.getFormatClass();
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/dc03c913/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index 32b3f74..e475f10 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -41,8 +41,8 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
InterruptedException {
this.crunchSplit = (CrunchInputSplit) inputSplit;
- if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
- combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+ if (crunchSplit.get() instanceof CombineFileSplit) {
+ combineFileSplit = (CombineFileSplit) crunchSplit.get();
}
this.context = context;
Configuration conf = crunchSplit.getConf();
@@ -87,7 +87,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
combineFileSplit.getLength(idx - 1),
combineFileSplit.getLocations());
} else {
- return crunchSplit.getInputSplit();
+ return crunchSplit.get();
}
}
@@ -143,8 +143,8 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
conf = context.getConfiguration();
crunchSplit.setConf(conf);
}
- if (crunchSplit.getInputSplit() instanceof CombineFileSplit) {
- combineFileSplit = (CombineFileSplit) crunchSplit.getInputSplit();
+ if (crunchSplit.get() instanceof CombineFileSplit) {
+ combineFileSplit = (CombineFileSplit) crunchSplit.get();
}
if (curReader != null) {
curReader.initialize(getDelegateSplit(),