You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2012/11/30 05:41:41 UTC
git commit: SQOOP-721 Duplicating rows on export when exporting from
compressed files.
Updated Branches:
refs/heads/trunk 33a7a8141 -> a840f41fc
SQOOP-721 Duplicating rows on export when exporting from compressed files.
(Jarek Jarcec Cecho via Cheolsoo Park)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a840f41f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a840f41f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a840f41f
Branch: refs/heads/trunk
Commit: a840f41fc76688de6ab0d61e2cba3af32b2a9a96
Parents: 33a7a81
Author: Cheolsoo Park <ch...@apache.org>
Authored: Thu Nov 29 20:40:25 2012 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Thu Nov 29 20:40:25 2012 -0800
----------------------------------------------------------------------
.../sqoop/mapreduce/CombineFileInputFormat.java | 202 +++++++++++----
.../sqoop/mapreduce/CombineFileRecordReader.java | 32 ++--
.../apache/sqoop/mapreduce/CombineFileSplit.java | 61 +++--
3 files changed, 195 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
index 4f58d7d..7d2be38 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
@@ -35,6 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -42,12 +45,19 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+// CHECKSTYLE:OFF
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
* {@link InputFormat#getSplits(JobContext)} method.
*
- * Splits are constructed from the files under the input path
+ * Splits are constructed from the files under the input paths.
* A split cannot have files from different pools.
* Each split returned may contain blocks from different files.
* If a maxSplitSize is specified, then blocks on the same node are
@@ -69,6 +79,14 @@ import org.apache.hadoop.net.NetworkTopology;
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
+ public static final Log LOG =
+ LogFactory.getLog(CombineFileInputFormat.class.getName());
+
+
+ public static final String SPLIT_MINSIZE_PERNODE =
+ "mapreduce.input.fileinputformat.split.minsize.per.node";
+ public static final String SPLIT_MINSIZE_PERRACK =
+ "mapreduce.input.fileinputformat.split.minsize.per.rack";
// ability to limit the size of a single split
private long maxSplitSize = 0;
private long minSplitSizeNode = 0;
@@ -85,8 +103,8 @@ public abstract class CombineFileInputFormat<K, V>
* Specify the maximum size (in bytes) of each split. Each split is
* approximately equal to the specified size.
*/
- protected void setMaxSplitSize(long val) {
- this.maxSplitSize = val;
+ protected void setMaxSplitSize(long maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
}
/**
@@ -96,8 +114,8 @@ public abstract class CombineFileInputFormat<K, V>
* This leftover data will be combined into its own split if its size
* exceeds minSplitSizeNode.
*/
- protected void setMinSplitSizeNode(long val) {
- this.minSplitSizeNode = val;
+ protected void setMinSplitSizeNode(long minSplitSizeNode) {
+ this.minSplitSizeNode = minSplitSizeNode;
}
/**
@@ -107,8 +125,8 @@ public abstract class CombineFileInputFormat<K, V>
* This leftover data will be combined into its own split if its size
* exceeds minSplitSizeRack.
*/
- protected void setMinSplitSizeRack(long val) {
- this.minSplitSizeRack = val;
+ protected void setMinSplitSizeRack(long minSplitSizeRack) {
+ this.minSplitSizeRack = minSplitSizeRack;
}
/**
@@ -132,8 +150,21 @@ public abstract class CombineFileInputFormat<K, V>
pools.add(multi);
}
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ final CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+
+ // Once we remove support for Hadoop < 2.0
+ //return codec instanceof SplittableCompressionCodec;
+ return false;
+ }
+
/**
- * default constructor.
+ * default constructor
*/
public CombineFileInputFormat() {
}
@@ -152,12 +183,12 @@ public abstract class CombineFileInputFormat<K, V>
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
- minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
+ minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
- minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
+ minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
@@ -165,19 +196,19 @@ public abstract class CombineFileInputFormat<K, V>
maxSize = conf.getLong("mapred.max.split.size", 0);
}
if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
- throw new IOException("Minimum split size pernode " + minSizeNode
- + " cannot be larger than maximum split size "
- + maxSize);
+ throw new IOException("Minimum split size pernode " + minSizeNode +
+ " cannot be larger than maximum split size " +
+ maxSize);
}
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
- throw new IOException("Minimum split size per rack" + minSizeRack
- + " cannot be larger than maximum split size "
- + maxSize);
+ throw new IOException("Minimum split size per rack" + minSizeRack +
+ " cannot be larger than maximum split size " +
+ maxSize);
}
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
- throw new IOException("Minimum split size per node" + minSizeNode
- + " cannot be smaller than minimum split "
- + "size per rack " + minSizeRack);
+ throw new IOException("Minimum split size per node" + minSizeNode +
+ " cannot be smaller than minimum split " +
+ "size per rack " + minSizeRack);
}
// all the files in input set
@@ -214,12 +245,12 @@ public abstract class CombineFileInputFormat<K, V>
}
}
// create splits for all files in this pool.
- getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]),
+ getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
- getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+ getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
@@ -228,13 +259,13 @@ public abstract class CombineFileInputFormat<K, V>
}
/**
- * Return all the splits in the specified set of paths.
+ * Return all the splits in the specified set of paths
*/
- // CHECKSTYLE:OFF
- private void getMoreSplits(Configuration conf, Path[] paths,
+ private void getMoreSplits(JobContext job, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
+ Configuration conf = job.getConfiguration();
// all blocks for all the files in input set
OneFileInfo[] files;
@@ -259,13 +290,14 @@ public abstract class CombineFileInputFormat<K, V>
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], conf, rackToBlocks, blockToNodes,
- nodeToBlocks, rackToNodes);
+ files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+ rackToBlocks, blockToNodes, nodeToBlocks,
+ rackToNodes, maxSize);
totLength += files[i].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> nodes = new ArrayList<String>();
+ Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
@@ -319,7 +351,7 @@ public abstract class CombineFileInputFormat<K, V>
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> racks = new ArrayList<String>();
+ Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
@@ -375,8 +407,8 @@ public abstract class CombineFileInputFormat<K, V>
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
- // remained to be processed. Keep them in 'overflow' block list.
- // These will be combined later.
+ // remained to be processed. Keep them in 'overflow' block list.
+ // These will be combined later.
overflowBlocks.addAll(validBlocks);
}
}
@@ -418,14 +450,13 @@ public abstract class CombineFileInputFormat<K, V>
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}
- // CHECKSTYLE:ON
/**
* Create a single split from the list of blocks specified in validBlocks
* Add this new split into splitList.
*/
private void addCreatedSplit(List<InputSplit> splitList,
- List<String> locations,
+ Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
@@ -450,17 +481,19 @@ public abstract class CombineFileInputFormat<K, V>
TaskAttemptContext context) throws IOException;
/**
- * information about one file from the File System.
+ * information about one file from the File System
*/
private static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, Configuration conf,
+ boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<String, Set<String>> rackToNodes)
+ HashMap<String, Set<String>> rackToNodes,
+ long maxSize)
throws IOException {
this.fileSize = 0;
@@ -473,32 +506,82 @@ public abstract class CombineFileInputFormat<K, V>
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
- blocks = new OneBlockInfo[locations.length];
- for (int i = 0; i < locations.length; i++) {
- fileSize += locations[i].getLength();
- OneBlockInfo oneblock = new OneBlockInfo(path,
- locations[i].getOffset(),
- locations[i].getLength(),
- locations[i].getHosts(),
- locations[i].getTopologyPaths());
- blocks[i] = oneblock;
+ if(locations.length == 0) {
+ locations = new BlockLocation[] { new BlockLocation() };
+ }
+ if (!isSplitable) {
+ // if the file is not splitable, just create the one block with
+ // full file length
+ blocks = new OneBlockInfo[1];
+ fileSize = stat.getLen();
+ blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+ .getHosts(), locations[0].getTopologyPaths());
+ } else {
+ ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+ locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ fileSize += locations[i].getLength();
+
+ // each split can be a maximum of maxSize
+ long left = locations[i].getLength();
+ long myOffset = locations[i].getOffset();
+ long myLength = 0;
+ do {
+ if (maxSize == 0) {
+ myLength = left;
+ } else {
+ if (left > maxSize && left < 2 * maxSize) {
+ // if remainder is between max and 2*max - then
+ // instead of creating splits of size max, left-max we
+ // create splits of size left/2 and left/2. This is
+ // a heuristic to avoid creating really really small
+ // splits.
+ myLength = left / 2;
+ } else {
+ myLength = Math.min(maxSize, left);
+ }
+ }
+ OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+ myLength, locations[i].getHosts(), locations[i]
+ .getTopologyPaths());
+ left -= myLength;
+ myOffset += myLength;
+
+ blocksList.add(oneblock);
+ } while (left > 0);
+ }
+ blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+ }
+
+ for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
+
// add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
- }
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+ }
+ }
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -524,16 +607,14 @@ public abstract class CombineFileInputFormat<K, V>
}
/**
- * information about one block from the File System.
+ * information about one block from the File System
*/
private static class OneBlockInfo {
- // CHECKSTYLE:OFF
Path onepath; // name of this file
long offset; // offset in file
long length; // length of this block
String[] hosts; // nodes on which this block resides
String[] racks; // network topology of hosts
- // CHECKSTYLE:ON
OneBlockInfo(Path path, long offset, long len,
String[] hosts, String[] topologyPaths) {
@@ -541,8 +622,8 @@ public abstract class CombineFileInputFormat<K, V>
this.offset = offset;
this.hosts = hosts;
this.length = len;
- assert (hosts.length == topologyPaths.length
- || topologyPaths.length == 0);
+ assert (hosts.length == topologyPaths.length ||
+ topologyPaths.length == 0);
// if the file system does not have any rack information, then
// use dummy rack location.
@@ -563,6 +644,11 @@ public abstract class CombineFileInputFormat<K, V>
}
}
+ protected BlockLocation[] getFileBlockLocations(
+ FileSystem fs, FileStatus stat) throws IOException {
+ return fs.getFileBlockLocations(stat, 0, stat.getLen());
+ }
+
private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
@@ -573,10 +659,12 @@ public abstract class CombineFileInputFormat<K, V>
hosts.add(host);
}
- private List<String> getHosts(List<String> racks) {
- List<String> hosts = new ArrayList<String>();
+ private Set<String> getHosts(Set<String> racks) {
+ Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
- hosts.addAll(rackToNodes.get(rack));
+ if (rackToNodes.containsKey(rack)) {
+ hosts.addAll(rackToNodes.get(rack));
+ }
}
return hosts;
}
@@ -621,3 +709,5 @@ public abstract class CombineFileInputFormat<K, V>
}
}
}
+
+// CHECKSTYLE:ON
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
index 6f9864b..8d4666c 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.Configuration;
/**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+// CHECKSTYLE:OFF
+
+/**
* A generic RecordReader that can hand out different recordReaders
* for each chunk in a {@link CombineFileSplit}.
* A CombineFileSplit can combine data chunks from multiple files.
@@ -34,19 +39,16 @@ import org.apache.hadoop.conf.Configuration;
* these data chunks from different files.
* @see CombineFileSplit
*/
-
public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
- // CHECKSTYLE:OFF
static final Class [] constructorSignature = new Class []
{CombineFileSplit.class,
TaskAttemptContext.class,
Integer.class};
- // CHECKSTYLE:ON
protected CombineFileSplit split;
- protected Class<? extends RecordReader<K, V>> rrClass;
- protected Constructor<? extends RecordReader<K, V>> rrConstructor;
+ protected Class<? extends RecordReader<K,V>> rrClass;
+ protected Constructor<? extends RecordReader<K,V>> rrConstructor;
protected FileSystem fs;
protected TaskAttemptContext context;
@@ -54,10 +56,10 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
protected long progress;
protected RecordReader<K, V> curReader;
- public void initialize(InputSplit psplit,
- TaskAttemptContext pcontext) throws IOException, InterruptedException {
- this.split = (CombineFileSplit)psplit;
- this.context = pcontext;
+ public void initialize(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ this.split = (CombineFileSplit)split;
+ this.context = context;
if (null != this.curReader) {
this.curReader.initialize(split, context);
}
@@ -106,7 +108,7 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
*/
public CombineFileRecordReader(CombineFileSplit split,
TaskAttemptContext context,
- Class<? extends RecordReader<K, V>> rrClass)
+ Class<? extends RecordReader<K,V>> rrClass)
throws IOException {
this.split = split;
this.context = context;
@@ -119,8 +121,8 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
rrConstructor.setAccessible(true);
} catch (Exception e) {
- throw new RuntimeException(rrClass.getName()
- + " does not have valid constructor", e);
+ throw new RuntimeException(rrClass.getName() +
+ " does not have valid constructor", e);
}
initNextRecordReader();
}
@@ -145,9 +147,6 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
// get a record reader for the idx-th chunk
try {
- curReader = rrConstructor.newInstance(new Object []
- {split, context, Integer.valueOf(idx)});
-
Configuration conf = context.getConfiguration();
// setup some helper config variables.
conf.set("map.input.file", split.getPath(idx).toString());
@@ -163,9 +162,10 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
curReader.initialize(split, context);
}
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException (e);
}
idx++;
return true;
}
}
+// CHECKSTYLE:ON
http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
index 8cf4d54..956e23b 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
@@ -28,16 +28,20 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
/**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+ // CHECKSTYLE:OFF
+
+/**
* A sub-collection of input files.
*
- * Unlike {@link FileSplit}, CombineFileSplit class does not represent
+ * Unlike FileSplit, CombineFileSplit class does not represent
* a split of a file, but a split of input files into smaller sets.
* A split may contain blocks from different file but all
* the blocks in the same split are probably local to some rack <br>
* CombineFileSplit can be used to implement {@link RecordReader}'s,
* with reading one record per file.
*
- * @see FileSplit
* @see CombineFileInputFormat
*/
public class CombineFileSplit extends InputSplit implements Writable {
@@ -49,7 +53,7 @@ public class CombineFileSplit extends InputSplit implements Writable {
private long totLength;
/**
- * default constructor.
+ * default constructor
*/
public CombineFileSplit() {}
public CombineFileSplit(Path[] files, long[] start,
@@ -58,31 +62,31 @@ public class CombineFileSplit extends InputSplit implements Writable {
}
public CombineFileSplit(Path[] files, long[] lengths) {
- long[] pstartoffset = new long[files.length];
- for (int i = 0; i < pstartoffset.length; i++) {
- pstartoffset[i] = 0;
+ long[] startoffset = new long[files.length];
+ for (int i = 0; i < startoffset.length; i++) {
+ startoffset[i] = 0;
}
- String[] plocations = new String[files.length];
- for (int i = 0; i < plocations.length; i++) {
- plocations[i] = "";
+ String[] locations = new String[files.length];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = "";
}
- initSplit(files, pstartoffset, lengths, plocations);
+ initSplit(files, startoffset, lengths, locations);
}
private void initSplit(Path[] files, long[] start,
- long[] plengths, String[] plocations) {
+ long[] lengths, String[] locations) {
this.startoffset = start;
- this.lengths = plengths;
+ this.lengths = lengths;
this.paths = files;
this.totLength = 0;
- this.locations = plocations;
+ this.locations = locations;
for(long length : lengths) {
totLength += length;
}
}
/**
- * Copy constructor.
+ * Copy constructor
*/
public CombineFileSplit(CombineFileSplit old) throws IOException {
this(old.getPaths(), old.getStartOffsets(),
@@ -93,42 +97,42 @@ public class CombineFileSplit extends InputSplit implements Writable {
return totLength;
}
- /** Returns an array containing the start offsets of the files in the split.*/
+ /** Returns an array containing the start offsets of the files in the split*/
public long[] getStartOffsets() {
return startoffset;
}
- /** Returns an array containing the lengths of the files in the split.*/
+ /** Returns an array containing the lengths of the files in the split*/
public long[] getLengths() {
return lengths;
}
- /** Returns the start offset of the i<sup>th</sup> Path.*/
+ /** Returns the start offset of the i<sup>th</sup> Path */
public long getOffset(int i) {
return startoffset[i];
}
- /** Returns the length of the i<sup>th</sup> Path. */
+ /** Returns the length of the i<sup>th</sup> Path */
public long getLength(int i) {
return lengths[i];
}
- /** Returns the number of Paths in the split.*/
+ /** Returns the number of Paths in the split */
public int getNumPaths() {
return paths.length;
}
- /** Returns the i<sup>th</sup> Path. */
+ /** Returns the i<sup>th</sup> Path */
public Path getPath(int i) {
return paths[i];
}
- /** Returns all the Paths in the split. */
+ /** Returns all the Paths in the split */
public Path[] getPaths() {
return paths;
}
- /** Returns all the Paths where this input-split resides. */
+ /** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
@@ -137,17 +141,17 @@ public class CombineFileSplit extends InputSplit implements Writable {
totLength = in.readLong();
int arrLength = in.readInt();
lengths = new long[arrLength];
- for(int i=0; i<arrLength; i++) {
+ for(int i=0; i<arrLength;i++) {
lengths[i] = in.readLong();
}
int filesLength = in.readInt();
paths = new Path[filesLength];
- for(int i=0; i<filesLength; i++) {
+ for(int i=0; i<filesLength;i++) {
paths[i] = new Path(Text.readString(in));
}
arrLength = in.readInt();
startoffset = new long[arrLength];
- for(int i=0; i<arrLength; i++) {
+ for(int i=0; i<arrLength;i++) {
startoffset[i] = in.readLong();
}
}
@@ -172,11 +176,11 @@ public class CombineFileSplit extends InputSplit implements Writable {
public String toString() {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < paths.length; i++) {
- if (i == 0) {
+ if (i == 0 ) {
sb.append("Paths:");
}
- sb.append(paths[i].toUri().getPath() + ":" + startoffset[i]
- + "+" + lengths[i]);
+ sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+ "+" + lengths[i]);
if (i < paths.length -1) {
sb.append(",");
}
@@ -193,3 +197,4 @@ public class CombineFileSplit extends InputSplit implements Writable {
return sb.toString();
}
}
+// CHECKSTYLE:ON