You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ch...@apache.org on 2012/11/30 05:41:41 UTC

git commit: SQOOP-721 Duplicating rows on export when exporting from compressed files.

Updated Branches:
  refs/heads/trunk 33a7a8141 -> a840f41fc


SQOOP-721 Duplicating rows on export when exporting from compressed files.

(Jarek Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a840f41f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a840f41f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a840f41f

Branch: refs/heads/trunk
Commit: a840f41fc76688de6ab0d61e2cba3af32b2a9a96
Parents: 33a7a81
Author: Cheolsoo Park <ch...@apache.org>
Authored: Thu Nov 29 20:40:25 2012 -0800
Committer: Cheolsoo Park <ch...@apache.org>
Committed: Thu Nov 29 20:40:25 2012 -0800

----------------------------------------------------------------------
 .../sqoop/mapreduce/CombineFileInputFormat.java    |  202 +++++++++++----
 .../sqoop/mapreduce/CombineFileRecordReader.java   |   32 ++--
 .../apache/sqoop/mapreduce/CombineFileSplit.java   |   61 +++--
 3 files changed, 195 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
index 4f58d7d..7d2be38 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.HashSet;
 import java.util.List;
@@ -35,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -42,12 +45,19 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+// CHECKSTYLE:OFF
 
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
  * {@link InputFormat#getSplits(JobContext)} method.
  *
- * Splits are constructed from the files under the input path
+ * Splits are constructed from the files under the input paths.
  * A split cannot have files from different pools.
  * Each split returned may contain blocks from different files.
  * If a maxSplitSize is specified, then blocks on the same node are
@@ -69,6 +79,14 @@ import org.apache.hadoop.net.NetworkTopology;
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
 
+  public static final Log LOG =
+    LogFactory.getLog(CombineFileInputFormat.class.getName());
+
+
+  public static final String SPLIT_MINSIZE_PERNODE =
+    "mapreduce.input.fileinputformat.split.minsize.per.node";
+  public static final String SPLIT_MINSIZE_PERRACK =
+    "mapreduce.input.fileinputformat.split.minsize.per.rack";
   // ability to limit the size of a single split
   private long maxSplitSize = 0;
   private long minSplitSizeNode = 0;
@@ -85,8 +103,8 @@ public abstract class CombineFileInputFormat<K, V>
    * Specify the maximum size (in bytes) of each split. Each split is
    * approximately equal to the specified size.
    */
-  protected void setMaxSplitSize(long val) {
-    this.maxSplitSize = val;
+  protected void setMaxSplitSize(long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
   }
 
   /**
@@ -96,8 +114,8 @@ public abstract class CombineFileInputFormat<K, V>
    * This leftover data will be combined into its own split if its size
    * exceeds minSplitSizeNode.
    */
-  protected void setMinSplitSizeNode(long val) {
-    this.minSplitSizeNode = val;
+  protected void setMinSplitSizeNode(long minSplitSizeNode) {
+    this.minSplitSizeNode = minSplitSizeNode;
   }
 
   /**
@@ -107,8 +125,8 @@ public abstract class CombineFileInputFormat<K, V>
    * This leftover data will be combined into its own split if its size
    * exceeds minSplitSizeRack.
    */
-  protected void setMinSplitSizeRack(long val) {
-    this.minSplitSizeRack = val;
+  protected void setMinSplitSizeRack(long minSplitSizeRack) {
+    this.minSplitSizeRack = minSplitSizeRack;
   }
 
   /**
@@ -132,8 +150,21 @@ public abstract class CombineFileInputFormat<K, V>
     pools.add(multi);
   }
 
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+
+    // Once we remove support for Hadoop < 2.0
+    //return codec instanceof SplittableCompressionCodec;
+    return false;
+  }
+
   /**
-   * default constructor.
+   * default constructor
    */
   public CombineFileInputFormat() {
   }
@@ -152,12 +183,12 @@ public abstract class CombineFileInputFormat<K, V>
     if (minSplitSizeNode != 0) {
       minSizeNode = minSplitSizeNode;
     } else {
-      minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
+      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
     }
     if (minSplitSizeRack != 0) {
       minSizeRack = minSplitSizeRack;
     } else {
-      minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
+      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
     }
     if (maxSplitSize != 0) {
       maxSize = maxSplitSize;
@@ -165,19 +196,19 @@ public abstract class CombineFileInputFormat<K, V>
       maxSize = conf.getLong("mapred.max.split.size", 0);
     }
     if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-      throw new IOException("Minimum split size pernode " + minSizeNode
-                            + " cannot be larger than maximum split size "
-                            + maxSize);
+      throw new IOException("Minimum split size pernode " + minSizeNode +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
     }
     if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-      throw new IOException("Minimum split size per rack" + minSizeRack
-                            + " cannot be larger than maximum split size "
-                            + maxSize);
+      throw new IOException("Minimum split size per rack" + minSizeRack +
+                            " cannot be larger than maximum split size " +
+                            maxSize);
     }
     if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-      throw new IOException("Minimum split size per node" + minSizeNode
-                            + " cannot be smaller than minimum split "
-                            + "size per rack " + minSizeRack);
+      throw new IOException("Minimum split size per node" + minSizeNode +
+                            " cannot be smaller than minimum split " +
+                            "size per rack " + minSizeRack);
     }
 
     // all the files in input set
@@ -214,12 +245,12 @@ public abstract class CombineFileInputFormat<K, V>
         }
       }
       // create splits for all files in this pool.
-      getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]),
+      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
                     maxSize, minSizeNode, minSizeRack, splits);
     }
 
     // create splits for all files that are not in any pool.
-    getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
                   maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
@@ -228,13 +259,13 @@ public abstract class CombineFileInputFormat<K, V>
   }
 
   /**
-   * Return all the splits in the specified set of paths.
+   * Return all the splits in the specified set of paths
    */
-  // CHECKSTYLE:OFF
-  private void getMoreSplits(Configuration conf, Path[] paths,
+  private void getMoreSplits(JobContext job, Path[] paths,
                              long maxSize, long minSizeNode, long minSizeRack,
                              List<InputSplit> splits)
     throws IOException {
+    Configuration conf = job.getConfiguration();
 
     // all blocks for all the files in input set
     OneFileInfo[] files;
@@ -259,13 +290,14 @@ public abstract class CombineFileInputFormat<K, V>
     // populate all the blocks for all files
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], conf, rackToBlocks, blockToNodes,
-                                      nodeToBlocks, rackToNodes);
+      files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
 
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
+    Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
 
     // process all nodes and create splits that are local
@@ -319,7 +351,7 @@ public abstract class CombineFileInputFormat<K, V>
     // in 'overflow'. After the processing of all racks is complete, these
     // overflow blocks will be combined into splits.
     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
+    Set<String> racks = new HashSet<String>();
 
     // Process all racks over and over again until there is no more work to do.
     while (blockToNodes.size() > 0) {
@@ -375,8 +407,8 @@ public abstract class CombineFileInputFormat<K, V>
             addCreatedSplit(splits, getHosts(racks), validBlocks);
           } else {
             // There were a few blocks in this rack that
-            // remained to be processed. Keep them in 'overflow' block list.
-            // These will be combined later.
+        	// remained to be processed. Keep them in 'overflow' block list.
+        	// These will be combined later.
             overflowBlocks.addAll(validBlocks);
           }
         }
@@ -418,14 +450,13 @@ public abstract class CombineFileInputFormat<K, V>
       addCreatedSplit(splits, getHosts(racks), validBlocks);
     }
   }
-  // CHECKSTYLE:ON
 
   /**
    * Create a single split from the list of blocks specified in validBlocks
    * Add this new split into splitList.
    */
   private void addCreatedSplit(List<InputSplit> splitList,
-                               List<String> locations,
+                               Collection<String> locations,
                                ArrayList<OneBlockInfo> validBlocks) {
     // create an input split
     Path[] fl = new Path[validBlocks.size()];
@@ -450,17 +481,19 @@ public abstract class CombineFileInputFormat<K, V>
       TaskAttemptContext context) throws IOException;
 
   /**
-   * information about one file from the File System.
+   * information about one file from the File System
    */
   private static class OneFileInfo {
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
     OneFileInfo(Path path, Configuration conf,
+                boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
                 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                HashMap<String, Set<String>> rackToNodes)
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
                 throws IOException {
       this.fileSize = 0;
 
@@ -473,32 +506,82 @@ public abstract class CombineFileInputFormat<K, V>
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
 
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path,
-                                       locations[i].getOffset(),
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths());
-          blocks[i] = oneblock;
+        if(locations.length == 0) {
+          locations = new BlockLocation[] { new BlockLocation() };
+        }
 
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            do {
+              if (maxSize == 0) {
+                myLength = left;
+              } else {
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
+              }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
+
+              blocksList.add(oneblock);
+            } while (left > 0);
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
           // add this block to the block --> node locations map
           blockToNodes.put(oneblock, oneblock.hosts);
 
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
           // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
             List<OneBlockInfo> blklist = rackToBlocks.get(rack);
             if (blklist == null) {
               blklist = new ArrayList<OneBlockInfo>();
               rackToBlocks.put(rack, blklist);
             }
             blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
-         }
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
+          }
 
           // add this block to the node --> block map
           for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -524,16 +607,14 @@ public abstract class CombineFileInputFormat<K, V>
   }
 
   /**
-   * information about one block from the File System.
+   * information about one block from the File System
    */
   private static class OneBlockInfo {
-    // CHECKSTYLE:OFF
     Path onepath;                // name of this file
     long offset;                 // offset in file
     long length;                 // length of this block
     String[] hosts;              // nodes on which this block resides
     String[] racks;              // network topology of hosts
-    // CHECKSTYLE:ON
 
     OneBlockInfo(Path path, long offset, long len,
                  String[] hosts, String[] topologyPaths) {
@@ -541,8 +622,8 @@ public abstract class CombineFileInputFormat<K, V>
       this.offset = offset;
       this.hosts = hosts;
       this.length = len;
-      assert (hosts.length == topologyPaths.length
-               || topologyPaths.length == 0);
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
 
       // if the file system does not have any rack information, then
       // use dummy rack location.
@@ -563,6 +644,11 @@ public abstract class CombineFileInputFormat<K, V>
     }
   }
 
+  protected BlockLocation[] getFileBlockLocations(
+    FileSystem fs, FileStatus stat) throws IOException {
+    return fs.getFileBlockLocations(stat, 0, stat.getLen());
+  }
+
   private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
                                     String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
@@ -573,10 +659,12 @@ public abstract class CombineFileInputFormat<K, V>
     hosts.add(host);
   }
 
-  private List<String> getHosts(List<String> racks) {
-    List<String> hosts = new ArrayList<String>();
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
     for (String rack : racks) {
-      hosts.addAll(rackToNodes.get(rack));
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
     }
     return hosts;
   }
@@ -621,3 +709,5 @@ public abstract class CombineFileInputFormat<K, V>
     }
   }
 }
+
+// CHECKSTYLE:ON

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
index 6f9864b..8d4666c 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.conf.Configuration;
 
 /**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+// CHECKSTYLE:OFF
+
+/**
  * A generic RecordReader that can hand out different recordReaders
  * for each chunk in a {@link CombineFileSplit}.
  * A CombineFileSplit can combine data chunks from multiple files.
@@ -34,19 +39,16 @@ import org.apache.hadoop.conf.Configuration;
  * these data chunks from different files.
  * @see CombineFileSplit
  */
-
 public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
 
-  // CHECKSTYLE:OFF
   static final Class [] constructorSignature = new Class []
                                          {CombineFileSplit.class,
                                           TaskAttemptContext.class,
                                           Integer.class};
-  // CHECKSTYLE:ON
 
   protected CombineFileSplit split;
-  protected Class<? extends RecordReader<K, V>> rrClass;
-  protected Constructor<? extends RecordReader<K, V>> rrConstructor;
+  protected Class<? extends RecordReader<K,V>> rrClass;
+  protected Constructor<? extends RecordReader<K,V>> rrConstructor;
   protected FileSystem fs;
   protected TaskAttemptContext context;
 
@@ -54,10 +56,10 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
   protected long progress;
   protected RecordReader<K, V> curReader;
 
-  public void initialize(InputSplit psplit,
-      TaskAttemptContext pcontext) throws IOException, InterruptedException {
-    this.split = (CombineFileSplit)psplit;
-    this.context = pcontext;
+  public void initialize(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    this.split = (CombineFileSplit)split;
+    this.context = context;
     if (null != this.curReader) {
       this.curReader.initialize(split, context);
     }
@@ -106,7 +108,7 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
    */
   public CombineFileRecordReader(CombineFileSplit split,
                                  TaskAttemptContext context,
-                                 Class<? extends RecordReader<K, V>> rrClass)
+                                 Class<? extends RecordReader<K,V>> rrClass)
     throws IOException {
     this.split = split;
     this.context = context;
@@ -119,8 +121,8 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
       rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
       rrConstructor.setAccessible(true);
     } catch (Exception e) {
-      throw new RuntimeException(rrClass.getName()
-                                 + " does not have valid constructor", e);
+      throw new RuntimeException(rrClass.getName() +
+                                 " does not have valid constructor", e);
     }
     initNextRecordReader();
   }
@@ -145,9 +147,6 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
 
     // get a record reader for the idx-th chunk
     try {
-      curReader =  rrConstructor.newInstance(new Object []
-                            {split, context, Integer.valueOf(idx)});
-
       Configuration conf = context.getConfiguration();
       // setup some helper config variables.
       conf.set("map.input.file", split.getPath(idx).toString());
@@ -163,9 +162,10 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
         curReader.initialize(split, context);
       }
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new RuntimeException (e);
     }
     idx++;
     return true;
   }
 }
+// CHECKSTYLE:ON

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a840f41f/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
index 8cf4d54..956e23b 100644
--- a/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
+++ b/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
@@ -28,16 +28,20 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
+ * This file was ported from Hadoop 2.0.2-alpha
+ */
+ // CHECKSTYLE:OFF
+
+/**
  * A sub-collection of input files.
  *
- * Unlike {@link FileSplit}, CombineFileSplit class does not represent
+ * Unlike FileSplit, CombineFileSplit class does not represent
  * a split of a file, but a split of input files into smaller sets.
  * A split may contain blocks from different file but all
  * the blocks in the same split are probably local to some rack <br>
  * CombineFileSplit can be used to implement {@link RecordReader}'s,
  * with reading one record per file.
  *
- * @see FileSplit
  * @see CombineFileInputFormat
  */
 public class CombineFileSplit extends InputSplit implements Writable {
@@ -49,7 +53,7 @@ public class CombineFileSplit extends InputSplit implements Writable {
   private long totLength;
 
   /**
-   * default constructor.
+   * default constructor
    */
   public CombineFileSplit() {}
   public CombineFileSplit(Path[] files, long[] start,
@@ -58,31 +62,31 @@ public class CombineFileSplit extends InputSplit implements Writable {
   }
 
   public CombineFileSplit(Path[] files, long[] lengths) {
-    long[] pstartoffset = new long[files.length];
-    for (int i = 0; i < pstartoffset.length; i++) {
-      pstartoffset[i] = 0;
+    long[] startoffset = new long[files.length];
+    for (int i = 0; i < startoffset.length; i++) {
+      startoffset[i] = 0;
     }
-    String[] plocations = new String[files.length];
-    for (int i = 0; i < plocations.length; i++) {
-      plocations[i] = "";
+    String[] locations = new String[files.length];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = "";
     }
-    initSplit(files, pstartoffset, lengths, plocations);
+    initSplit(files, startoffset, lengths, locations);
   }
 
   private void initSplit(Path[] files, long[] start,
-                         long[] plengths, String[] plocations) {
+                         long[] lengths, String[] locations) {
     this.startoffset = start;
-    this.lengths = plengths;
+    this.lengths = lengths;
     this.paths = files;
     this.totLength = 0;
-    this.locations = plocations;
+    this.locations = locations;
     for(long length : lengths) {
       totLength += length;
     }
   }
 
   /**
-   * Copy constructor.
+   * Copy constructor
    */
   public CombineFileSplit(CombineFileSplit old) throws IOException {
     this(old.getPaths(), old.getStartOffsets(),
@@ -93,42 +97,42 @@ public class CombineFileSplit extends InputSplit implements Writable {
     return totLength;
   }
 
-  /** Returns an array containing the start offsets of the files in the split.*/
+  /** Returns an array containing the start offsets of the files in the split*/
   public long[] getStartOffsets() {
     return startoffset;
   }
 
-  /** Returns an array containing the lengths of the files in the split.*/
+  /** Returns an array containing the lengths of the files in the split*/
   public long[] getLengths() {
     return lengths;
   }
 
-  /** Returns the start offset of the i<sup>th</sup> Path.*/
+  /** Returns the start offset of the i<sup>th</sup> Path */
   public long getOffset(int i) {
     return startoffset[i];
   }
 
-  /** Returns the length of the i<sup>th</sup> Path. */
+  /** Returns the length of the i<sup>th</sup> Path */
   public long getLength(int i) {
     return lengths[i];
   }
 
-  /** Returns the number of Paths in the split.*/
+  /** Returns the number of Paths in the split */
   public int getNumPaths() {
     return paths.length;
   }
 
-  /** Returns the i<sup>th</sup> Path. */
+  /** Returns the i<sup>th</sup> Path */
   public Path getPath(int i) {
     return paths[i];
   }
 
-  /** Returns all the Paths in the split. */
+  /** Returns all the Paths in the split */
   public Path[] getPaths() {
     return paths;
   }
 
-  /** Returns all the Paths where this input-split resides. */
+  /** Returns all the Paths where this input-split resides */
   public String[] getLocations() throws IOException {
     return locations;
   }
@@ -137,17 +141,17 @@ public class CombineFileSplit extends InputSplit implements Writable {
     totLength = in.readLong();
     int arrLength = in.readInt();
     lengths = new long[arrLength];
-    for(int i=0; i<arrLength; i++) {
+    for(int i=0; i<arrLength;i++) {
       lengths[i] = in.readLong();
     }
     int filesLength = in.readInt();
     paths = new Path[filesLength];
-    for(int i=0; i<filesLength; i++) {
+    for(int i=0; i<filesLength;i++) {
       paths[i] = new Path(Text.readString(in));
     }
     arrLength = in.readInt();
     startoffset = new long[arrLength];
-    for(int i=0; i<arrLength; i++) {
+    for(int i=0; i<arrLength;i++) {
       startoffset[i] = in.readLong();
     }
   }
@@ -172,11 +176,11 @@ public class CombineFileSplit extends InputSplit implements Writable {
  public String toString() {
     StringBuffer sb = new StringBuffer();
     for (int i = 0; i < paths.length; i++) {
-      if (i == 0) {
+      if (i == 0 ) {
         sb.append("Paths:");
       }
-      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i]
-                + "+" + lengths[i]);
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
+                "+" + lengths[i]);
       if (i < paths.length -1) {
         sb.append(",");
       }
@@ -193,3 +197,4 @@ public class CombineFileSplit extends InputSplit implements Writable {
     return sb.toString();
   }
 }
+// CHECKSTYLE:ON