You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 14:08:43 UTC

svn commit: r1522541 - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/io/orc/ shims/src/0.20/java/org/apache/hadoop/hive/shims/ shims/src/0.20S/java/org/apache/hadoop/hive/shims/ shims/src/0.23/java/...

Author: hashutosh
Date: Thu Sep 12 12:08:42 2013
New Revision: 1522541

URL: http://svn.apache.org/r1522541
Log:
HIVE-5102 : ORC getSplits should create splits based the stripes (Owen Omalley via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
    hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Sep 12 12:08:42 2013
@@ -18,41 +18,64 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A MapReduce/Hive input format for ORC files.
  */
-public class OrcInputFormat  extends FileInputFormat<NullWritable, OrcStruct>
-  implements InputFormatChecker {
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
+                                       InputFormatChecker {
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+  static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
+  static final String MAX_SPLIT_SIZE = "mapred.max.split.size";
+  private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
+  private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
+
+  /**
+   * When picking the hosts for a split that crosses block boundaries,
+   * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the
+   * number of bytes available on the host with the most.
+   * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the
+   * split will contain host2 (100% of host2) and host3 (90% of host2). Host1
+   * with 50% will be dropped.
+   */
+  private static final double MIN_INCLUDED_LOCATION = 0.80;
 
   private static class OrcRecordReader
       implements RecordReader<NullWritable, OrcStruct> {
@@ -137,10 +160,12 @@ public class OrcInputFormat  extends Fil
     }
   }
 
-  public OrcInputFormat() {
-    // just set a really small lower bound
-    setMinSplitSize(16 * 1024);
-  }
+  private static final PathFilter hiddenFileFilter = new PathFilter(){
+    public boolean accept(Path p){
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
 
   /**
    * Recurse down into a type subtree turning on all of the sub-columns.
@@ -148,7 +173,7 @@ public class OrcInputFormat  extends Fil
    * @param result the global view of columns that should be included
    * @param typeId the root of tree to enable
    */
-  private static void includeColumnRecursive(List<OrcProto.Type> types,
+  static void includeColumnRecursive(List<OrcProto.Type> types,
                                              boolean[] result,
                                              int typeId) {
     result[typeId] = true;
@@ -165,7 +190,7 @@ public class OrcInputFormat  extends Fil
    * @param conf the configuration
    * @return true for each column that should be included
    */
-  private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
+  static boolean[] findIncludedColumns(List<OrcProto.Type> types,
                                                Configuration conf) {
     String includedStr =
         conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
@@ -220,4 +245,324 @@ public class OrcInputFormat  extends Fil
     }
     return true;
   }
+
+  /**
+   * Get the list of input {@link Path}s for the map-reduce job.
+   *
+   * @param conf The configuration of the job
+   * @return the list of input {@link Path}s for the map-reduce job.
+   */
+  static Path[] getInputPaths(JobConf conf) throws IOException {
+    String dirs = conf.get("mapred.input.dir");
+    if (dirs == null) {
+      throw new IOException("Configuration mapred.input.dir is not defined.");
+    }
+    String [] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  /**
+   * The global information about the split generation that we pass around to
+   * the different worker threads.
+   */
+  static class Context {
+    private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
+    private final List<FileSplit> splits = new ArrayList<FileSplit>(10000);
+    private final List<Throwable> errors = new ArrayList<Throwable>();
+    private final HadoopShims shims = ShimLoader.getHadoopShims();
+    private final Configuration conf;
+    private final long maxSize;
+    private final long minSize;
+
+    /**
+     * A count of the number of threads that may create more work for the
+     * thread pool.
+     */
+    private int schedulers = 0;
+
+    Context(Configuration conf) {
+      this.conf = conf;
+      minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
+      maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
+    }
+
+    int getSchedulers() {
+      return schedulers;
+    }
+
+    /**
+     * Get the Nth split.
+     * @param index if index >= 0, count from the front, otherwise count from
+     *     the back.
+     * @result the Nth file split
+     */
+    FileSplit getResult(int index) {
+      if (index >= 0) {
+        return splits.get(index);
+      } else {
+        return splits.get(splits.size() + index);
+      }
+    }
+
+    List<Throwable> getErrors() {
+      return errors;
+    }
+
+    /**
+     * Add a unit of work.
+     * @param runnable the object to run
+     */
+    synchronized void schedule(Runnable runnable) {
+      if (runnable instanceof FileGenerator) {
+        schedulers += 1;
+      }
+      threadPool.execute(runnable);
+    }
+
+    /**
+     * Mark a worker that may generate more work as done.
+     */
+    synchronized void decrementSchedulers() {
+      schedulers -= 1;
+      if (schedulers == 0) {
+        notify();
+      }
+    }
+
+    /**
+     * Wait until all of the tasks are done. It waits until all of the
+     * threads that may create more work are done and then shuts down the
+     * thread pool and waits for the final threads to finish.
+     */
+    synchronized void waitForTasks() {
+      try {
+        while (schedulers != 0) {
+          wait();
+        }
+        threadPool.shutdown();
+        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      } catch (InterruptedException ie) {
+        throw new IllegalStateException("interrupted", ie);
+      }
+    }
+  }
+
+  /**
+   * Given a directory, get the list of files and blocks in those files.
+   * A thread is used for each directory.
+   */
+  static final class FileGenerator implements Runnable {
+    private final Context context;
+    private final FileSystem fs;
+    private final Path dir;
+
+    FileGenerator(Context context, FileSystem fs, Path dir) {
+      this.context = context;
+      this.fs = fs;
+      this.dir = dir;
+    }
+
+    /**
+     * For each path, get the list of files and blocks that they consist of.
+     */
+    @Override
+    public void run() {
+      try {
+        Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
+            hiddenFileFilter);
+        while (itr.hasNext()) {
+          FileStatus file = itr.next();
+          if (!file.isDir()) {
+            context.schedule(new SplitGenerator(context, fs, file));
+          }
+        }
+        // mark the fact that we are done
+        context.decrementSchedulers();
+      } catch (Throwable th) {
+        context.decrementSchedulers();
+        synchronized (context.errors) {
+          context.errors.add(th);
+        }
+      }
+    }
+  }
+
+  /**
+   * Split the stripes of a given file into input splits.
+   * A thread is used for each file.
+   */
+  static final class SplitGenerator implements Runnable {
+    private final Context context;
+    private final FileSystem fs;
+    private final FileStatus file;
+    private final long blockSize;
+    private final BlockLocation[] locations;
+
+    SplitGenerator(Context context, FileSystem fs,
+                   FileStatus file) throws IOException {
+      this.context = context;
+      this.fs = fs;
+      this.file = file;
+      this.blockSize = file.getBlockSize();
+      locations = context.shims.getLocations(fs, file);
+    }
+
+    Path getPath() {
+      return file.getPath();
+    }
+
+    @Override
+    public String toString() {
+      return "splitter(" + file.getPath() + ")";
+    }
+
+    /**
+     * Compute the number of bytes that overlap between the two ranges.
+     * @param offset1 start of range1
+     * @param length1 length of range1
+     * @param offset2 start of range2
+     * @param length2 length of range2
+     * @return the number of bytes in the overlap range
+     */
+    static long getOverlap(long offset1, long length1,
+                           long offset2, long length2) {
+      long end1 = offset1 + length1;
+      long end2 = offset2 + length2;
+      if (end2 <= offset1 || end1 <= offset2) {
+        return 0;
+      } else {
+        return Math.min(end1, end2) - Math.max(offset1, offset2);
+      }
+    }
+
+    /**
+     * Create an input split over the given range of bytes. The location of the
+     * split is based on where the majority of the byte are coming from. ORC
+     * files are unlikely to have splits that cross between blocks because they
+     * are written with large block sizes.
+     * @param offset the start of the split
+     * @param length the length of the split
+     * @throws IOException
+     */
+    void createSplit(long offset, long length) throws IOException {
+      String[] hosts;
+      if ((offset % blockSize) + length <= blockSize) {
+        // handle the single block case
+        hosts = locations[(int) (offset / blockSize)].getHosts();
+      } else {
+        // Calculate the number of bytes in the split that are local to each
+        // host.
+        Map<String, LongWritable> sizes = new HashMap<String, LongWritable>();
+        long maxSize = 0;
+        for(BlockLocation block: locations) {
+          long overlap = getOverlap(offset, length, block.getOffset(),
+              block.getLength());
+          if (overlap > 0) {
+            for(String host: block.getHosts()) {
+              LongWritable val = sizes.get(host);
+              if (val == null) {
+                val = new LongWritable();
+                sizes.put(host, val);
+              }
+              val.set(val.get() + overlap);
+              maxSize = Math.max(maxSize, val.get());
+            }
+          }
+        }
+        // filter the list of locations to those that have at least 80% of the
+        // max
+        long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION);
+        List<String> hostList = new ArrayList<String>();
+        // build the locations in a predictable order to simplify testing
+        for(BlockLocation block: locations) {
+          for(String host: block.getHosts()) {
+            if (sizes.containsKey(host)) {
+              if (sizes.get(host).get() >= threshold) {
+                hostList.add(host);
+              }
+              sizes.remove(host);
+            }
+          }
+        }
+        hosts = new String[hostList.size()];
+        hostList.toArray(hosts);
+      }
+      synchronized (context.splits) {
+        context.splits.add(new FileSplit(file.getPath(), offset, length,
+            hosts));
+      }
+    }
+
+    /**
+     * Divide the adjacent stripes in the file into input splits based on the
+     * block size and the configured minimum and maximum sizes.
+     */
+    @Override
+    public void run() {
+      try {
+        Reader orcReader = OrcFile.createReader(fs, file.getPath());
+        long currentOffset = -1;
+        long currentLength = 0;
+        for(StripeInformation stripe: orcReader.getStripes()) {
+          // if we are working on a stripe, over the min stripe size, and
+          // crossed a block boundary, cut the input split here.
+          if (currentOffset != -1 && currentLength > context.minSize &&
+              (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
+            createSplit(currentOffset, currentLength);
+            currentOffset = -1;
+          }
+          // if we aren't building a split, start a new one.
+          if (currentOffset == -1) {
+            currentOffset = stripe.getOffset();
+            currentLength = stripe.getLength();
+          } else {
+            currentLength += stripe.getLength();
+          }
+          if (currentLength >= context.maxSize) {
+            createSplit(currentOffset, currentLength);
+            currentOffset = -1;
+          }
+        }
+        if (currentOffset != -1) {
+          createSplit(currentOffset, currentLength);
+        }
+      } catch (Throwable th) {
+        synchronized (context.errors) {
+          context.errors.add(th);
+        }
+      }
+    }
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job,
+                                int numSplits) throws IOException {
+    // use threads to resolve directories into splits
+    Context context = new Context(job);
+    for(Path dir: getInputPaths(job)) {
+      FileSystem fs = dir.getFileSystem(job);
+      context.schedule(new FileGenerator(context, fs, dir));
+    }
+    context.waitForTasks();
+    // deal with exceptions
+    if (!context.errors.isEmpty()) {
+      List<IOException> errors =
+          new ArrayList<IOException>(context.errors.size());
+      for(Throwable th: context.errors) {
+        if (th instanceof IOException) {
+          errors.add((IOException) th);
+        } else {
+          throw new IOException("serious problem", th);
+        }
+      }
+      throw new InvalidInputException(errors);
+    }
+    InputSplit[] result = new InputSplit[context.splits.size()];
+    context.splits.toArray(result);
+    return result;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Sep 12 12:08:42 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -63,6 +64,11 @@ final class ReaderImpl implements Reader
     }
 
     @Override
+    public long getLength() {
+      return stripe.getDataLength() + getIndexLength() + getFooterLength();
+    }
+
+    @Override
     public long getDataLength() {
       return stripe.getDataLength();
     }
@@ -269,7 +275,7 @@ final class ReaderImpl implements Reader
     ByteBuffer buffer = ByteBuffer.allocate(readSize);
     file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
       buffer.remaining());
-    int psLen = buffer.get(readSize - 1);
+    int psLen = buffer.get(readSize - 1) & 0xff;
     ensureOrcFooter(file, path, psLen, buffer);
     int psOffset = readSize - 1 - psLen;
     CodedInputStream in = CodedInputStream.newInstance(buffer.array(),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java Thu Sep 12 12:08:42 2013
@@ -28,6 +28,12 @@ public interface StripeInformation {
   long getOffset();
 
   /**
+   * Get the total length of the stripe in bytes.
+   * @return the number of bytes in the stripe
+   */
+  long getLength();
+
+  /**
    * Get the length of the stripe's indexes.
    * @return the number of bytes in the index
    */

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Thu Sep 12 12:08:42 2013
@@ -17,29 +17,38 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,11 +56,17 @@ import org.junit.rules.TestName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 public class TestInputOutputFormat {
@@ -93,6 +108,409 @@ public class TestInputOutputFormat {
   }
 
   @Test
+  public void testOverlap() throws Exception {
+    assertEquals(0, OrcInputFormat.SplitGenerator.getOverlap(100, 100,
+        200, 100));
+    assertEquals(0, OrcInputFormat.SplitGenerator.getOverlap(0, 1000,
+        2000, 100));
+    assertEquals(100, OrcInputFormat.SplitGenerator.getOverlap(1000, 1000,
+        1500, 100));
+    assertEquals(250, OrcInputFormat.SplitGenerator.getOverlap(1000, 250,
+        500, 2000));
+    assertEquals(100, OrcInputFormat.SplitGenerator.getOverlap(1000, 1000,
+        1900, 1000));
+    assertEquals(500, OrcInputFormat.SplitGenerator.getOverlap(2000, 1000,
+        2500, 2000));
+  }
+
+  @Test
+  public void testGetInputPaths() throws Exception {
+    conf.set("mapred.input.dir", "a,b,c");
+    assertArrayEquals(new Path[]{new Path("a"), new Path("b"), new Path("c")},
+        OrcInputFormat.getInputPaths(conf));
+    conf.set("mapred.input.dir", "/a/b/c/d/e");
+    assertArrayEquals(new Path[]{new Path("/a/b/c/d/e")},
+        OrcInputFormat.getInputPaths(conf));
+    conf.set("mapred.input.dir", "/a/b/c\\,d,/e/f\\,g/h");
+    assertArrayEquals(new Path[]{new Path("/a/b/c,d"), new Path("/e/f,g/h")},
+        OrcInputFormat.getInputPaths(conf));
+  }
+
+  static class TestContext extends OrcInputFormat.Context {
+    List<Runnable> queue = new ArrayList<Runnable>();
+
+    TestContext(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public void schedule(Runnable runnable) {
+      queue.add(runnable);
+    }
+  }
+
+  @Test
+  public void testFileGenerator() throws Exception {
+    TestContext context = new TestContext(conf);
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("/a/b/part-00", 1000, new byte[0]),
+        new MockFile("/a/b/part-01", 1000, new byte[0]),
+        new MockFile("/a/b/_part-02", 1000, new byte[0]),
+        new MockFile("/a/b/.part-03", 1000, new byte[0]),
+        new MockFile("/a/b/part-04", 1000, new byte[0]));
+    OrcInputFormat.FileGenerator gen =
+      new OrcInputFormat.FileGenerator(context, fs, new Path("/a/b"));
+    gen.run();
+    if (context.getErrors().size() > 0) {
+      for(Throwable th: context.getErrors()) {
+        System.out.println(StringUtils.stringifyException(th));
+      }
+      throw new IOException("Errors during file generation");
+    }
+    assertEquals(-1, context.getSchedulers());
+    assertEquals(3, context.queue.size());
+    assertEquals(new Path("/a/b/part-00"),
+        ((OrcInputFormat.SplitGenerator) context.queue.get(0)).getPath());
+    assertEquals(new Path("/a/b/part-01"),
+        ((OrcInputFormat.SplitGenerator) context.queue.get(1)).getPath());
+    assertEquals(new Path("/a/b/part-04"),
+        ((OrcInputFormat.SplitGenerator) context.queue.get(2)).getPath());
+  }
+
+  static final Charset UTF8 = Charset.forName("UTF-8");
+
+  static class MockBlock {
+    int offset;
+    int length;
+    final String[] hosts;
+
+    MockBlock(String... hosts) {
+      this.hosts = hosts;
+    }
+  }
+
+  static class MockFile {
+    final Path path;
+    final int blockSize;
+    final int length;
+    final MockBlock[] blocks;
+    final byte[] content;
+
+    MockFile(String path, int blockSize, byte[] content, MockBlock... blocks) {
+      this.path = new Path(path);
+      this.blockSize = blockSize;
+      this.blocks = blocks;
+      this.content = content;
+      this.length = content.length;
+      int offset = 0;
+      for(MockBlock block: blocks) {
+        block.offset = offset;
+        block.length = Math.min(length - offset, blockSize);
+        offset += block.length;
+      }
+    }
+  }
+
+  static class MockInputStream extends FSInputStream {
+    final MockFile file;
+    int offset = 0;
+
+    public MockInputStream(MockFile file) throws IOException {
+      this.file = file;
+    }
+
+    @Override
+    public void seek(long offset) throws IOException {
+      this.offset = (int) offset;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return offset;
+    }
+
+    @Override
+    public boolean seekToNewSource(long l) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (offset < file.length) {
+        return file.content[offset++] & 0xff;
+      }
+      return -1;
+    }
+  }
+
+  static class MockFileSystem extends FileSystem {
+    final MockFile[] files;
+    Path workingDir = new Path("/");
+
+    MockFileSystem(Configuration conf, MockFile... files) {
+      setConf(conf);
+      this.files = files;
+    }
+
+    @Override
+    public URI getUri() {
+      try {
+        return new URI("mock:///");
+      } catch (URISyntaxException err) {
+        throw new IllegalArgumentException("huh?", err);
+      }
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int i) throws IOException {
+      for(MockFile file: files) {
+        if (file.path.equals(path)) {
+          return new FSDataInputStream(new MockInputStream(file));
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path, FsPermission fsPermission,
+                                     boolean b, int i, short i2, long l,
+                                     Progressable progressable
+                                     ) throws IOException {
+      throw new UnsupportedOperationException("no writes");
+    }
+
+    @Override
+    public FSDataOutputStream append(Path path, int i,
+                                     Progressable progressable
+                                     ) throws IOException {
+      throw new UnsupportedOperationException("no writes");
+    }
+
+    @Override
+    public boolean rename(Path path, Path path2) throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path path) throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean delete(Path path, boolean b) throws IOException {
+      return false;
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+      List<FileStatus> result = new ArrayList<FileStatus>();
+      for(MockFile file: files) {
+        if (file.path.getParent().equals(path)) {
+          result.add(getFileStatus(file.path));
+        }
+      }
+      return result.toArray(new FileStatus[result.size()]);
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+      workingDir = path;
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return workingDir;
+    }
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) {
+      return false;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      for(MockFile file: files) {
+        if (file.path.equals(path)) {
+          return new FileStatus(file.length, false, 1, file.blockSize, 0, 0,
+              FsPermission.createImmutable((short) 644), "owen", "group",
+              file.path);
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(FileStatus stat,
+                                                 long start, long len) {
+      List<BlockLocation> result = new ArrayList<BlockLocation>();
+      for(MockFile file: files) {
+        if (file.path.equals(stat.getPath())) {
+          for(MockBlock block: file.blocks) {
+            if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
+                block.length, start, len) > 0) {
+              result.add(new BlockLocation(block.hosts, block.hosts,
+                  block.offset, block.length));
+            }
+          }
+          return result.toArray(new BlockLocation[result.size()]);
+        }
+      }
+      return new BlockLocation[0];
+    }
+  }
+
+  static void fill(DataOutputBuffer out, long length) throws IOException {
+    for(int i=0; i < length; ++i) {
+      out.write(0);
+    }
+  }
+
+  /**
+   * Create the binary contents of an ORC file that just has enough information
+   * to test the getInputSplits.
+   * @param stripeLengths the length of each stripe
+   * @return the bytes of the file
+   * @throws IOException
+   */
+  static byte[] createMockOrcFile(long... stripeLengths) throws IOException {
+    OrcProto.Footer.Builder footer = OrcProto.Footer.newBuilder();
+    final long headerLen = 3;
+    long offset = headerLen;
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    for(long stripeLength: stripeLengths) {
+      footer.addStripes(OrcProto.StripeInformation.newBuilder()
+                          .setOffset(offset)
+                          .setIndexLength(0)
+                          .setDataLength(stripeLength-10)
+                          .setFooterLength(10)
+                          .setNumberOfRows(1000));
+      offset += stripeLength;
+    }
+    fill(buffer, offset);
+    footer.addTypes(OrcProto.Type.newBuilder()
+                     .setKind(OrcProto.Type.Kind.STRUCT)
+                     .addFieldNames("col1")
+                     .addSubtypes(1));
+    footer.addTypes(OrcProto.Type.newBuilder()
+        .setKind(OrcProto.Type.Kind.STRING));
+    footer.setNumberOfRows(1000 * stripeLengths.length)
+          .setHeaderLength(headerLen)
+          .setContentLength(offset - headerLen);
+    footer.build().writeTo(buffer);
+    int footerEnd = buffer.getLength();
+    OrcProto.PostScript ps =
+      OrcProto.PostScript.newBuilder()
+        .setCompression(OrcProto.CompressionKind.NONE)
+        .setFooterLength(footerEnd - offset)
+        .setMagic("ORC")
+        .build();
+    ps.writeTo(buffer);
+    buffer.write(buffer.getLength() - footerEnd);
+    byte[] result = new byte[buffer.getLength()];
+    System.arraycopy(buffer.getData(), 0, result, 0, buffer.getLength());
+    return result;
+  }
+
+  @Test
+  public void testAddSplit() throws Exception {
+    // create a file with 5 blocks spread around the cluster
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("/a/file", 500,
+          createMockOrcFile(197, 300, 600, 200, 200, 100, 100, 100, 100, 100),
+          new MockBlock("host1-1", "host1-2", "host1-3"),
+          new MockBlock("host2-1", "host0", "host2-3"),
+          new MockBlock("host0", "host3-2", "host3-3"),
+          new MockBlock("host4-1", "host4-2", "host4-3"),
+          new MockBlock("host5-1", "host5-2", "host5-3")));
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    OrcInputFormat.SplitGenerator splitter =
+        new OrcInputFormat.SplitGenerator(context, fs,
+            fs.getFileStatus(new Path("/a/file")));
+    splitter.createSplit(0, 200);
+    FileSplit result = context.getResult(-1);
+    assertEquals(0, result.getStart());
+    assertEquals(200, result.getLength());
+    assertEquals("/a/file", result.getPath().toString());
+    String[] locs = result.getLocations();
+    assertEquals(3, locs.length);
+    assertEquals("host1-1", locs[0]);
+    assertEquals("host1-2", locs[1]);
+    assertEquals("host1-3", locs[2]);
+    splitter.createSplit(500, 600);
+    result = context.getResult(-1);
+    locs = result.getLocations();
+    assertEquals(3, locs.length);
+    assertEquals("host2-1", locs[0]);
+    assertEquals("host0", locs[1]);
+    assertEquals("host2-3", locs[2]);
+    splitter.createSplit(0, 2500);
+    result = context.getResult(-1);
+    locs = result.getLocations();
+    assertEquals(1, locs.length);
+    assertEquals("host0", locs[0]);
+  }
+
+  @Test
+  public void testSplitGenerator() throws Exception {
+    // create a file with 5 blocks spread around the cluster
+    long[] stripeSizes =
+        new long[]{197, 300, 600, 200, 200, 100, 100, 100, 100, 100};
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("/a/file", 500,
+          createMockOrcFile(stripeSizes),
+          new MockBlock("host1-1", "host1-2", "host1-3"),
+          new MockBlock("host2-1", "host0", "host2-3"),
+          new MockBlock("host0", "host3-2", "host3-3"),
+          new MockBlock("host4-1", "host4-2", "host4-3"),
+          new MockBlock("host5-1", "host5-2", "host5-3")));
+    conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 300);
+    conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200);
+    OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+    OrcInputFormat.SplitGenerator splitter =
+        new OrcInputFormat.SplitGenerator(context, fs,
+            fs.getFileStatus(new Path("/a/file")));
+    splitter.run();
+    if (context.getErrors().size() > 0) {
+      for(Throwable th: context.getErrors()) {
+        System.out.println(StringUtils.stringifyException(th));
+      }
+      throw new IOException("Errors during splitting");
+    }
+    FileSplit result = context.getResult(0);
+    assertEquals(3, result.getStart());
+    assertEquals(497, result.getLength());
+    result = context.getResult(1);
+    assertEquals(500, result.getStart());
+    assertEquals(600, result.getLength());
+    result = context.getResult(2);
+    assertEquals(1100, result.getStart());
+    assertEquals(400, result.getLength());
+    result = context.getResult(3);
+    assertEquals(1500, result.getStart());
+    assertEquals(300, result.getLength());
+    result = context.getResult(4);
+    assertEquals(1800, result.getStart());
+    assertEquals(200, result.getLength());
+    // test min = 0, max = 0 generates each stripe
+    conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0);
+    conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
+    context = new OrcInputFormat.Context(conf);
+    splitter = new OrcInputFormat.SplitGenerator(context, fs,
+      fs.getFileStatus(new Path("/a/file")));
+    splitter.run();
+    if (context.getErrors().size() > 0) {
+      for(Throwable th: context.getErrors()) {
+        System.out.println(StringUtils.stringifyException(th));
+      }
+      throw new IOException("Errors during splitting");
+    }
+    for(int i=0; i < stripeSizes.length; ++i) {
+      assertEquals("checking stripe " + i + " size",
+          stripeSizes[i], context.getResult(i).getLength());
+    }
+  }
+
+  @Test
   public void testInOutFormat() throws Exception {
     Properties properties = new Properties();
     StructObjectInspector inspector;
@@ -144,7 +562,7 @@ public class TestInputOutputFormat {
     IntObjectInspector intInspector =
         (IntObjectInspector) fields.get(0).getFieldObjectInspector();
     assertEquals(0.0, reader.getProgress(), 0.00001);
-    assertEquals(0, reader.getPos());
+    assertEquals(3, reader.getPos());
     while (reader.next(key, value)) {
       assertEquals(++rowNum, intInspector.get(inspector.
           getStructFieldData(serde.deserialize(value), fields.get(0))));
@@ -279,18 +697,7 @@ public class TestInputOutputFormat {
     InputFormat<?,?> in = new OrcInputFormat();
     FileInputFormat.setInputPaths(conf, testFilePath.toString());
     InputSplit[] splits = in.getSplits(conf, 1);
-    assertEquals(1, splits.length);
-
-    // read the whole file
-    conf.set("hive.io.file.readcolumn.ids", "0,1");
-    org.apache.hadoop.mapred.RecordReader reader =
-        in.getRecordReader(splits[0], conf, Reporter.NULL);
-    Object key = reader.createKey();
-    Object value = reader.createValue();
-    assertEquals(0.0, reader.getProgress(), 0.00001);
-    assertEquals(0, reader.getPos());
-    assertEquals(false, reader.next(key, value));
-    reader.close();
+    assertEquals(0, splits.length);
     assertEquals(null, serde.getSerDeStats());
   }
 

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Thu Sep 12 12:08:42 2013
@@ -1019,12 +1019,10 @@ public class TestOrcFile {
       stripeCount += 1;
       rowCount += stripe.getNumberOfRows();
       if (currentOffset < 0) {
-        currentOffset = stripe.getOffset() + stripe.getIndexLength() +
-            stripe.getDataLength() + stripe.getFooterLength();
+        currentOffset = stripe.getOffset() + stripe.getLength();
       } else {
         assertEquals(currentOffset, stripe.getOffset());
-        currentOffset += stripe.getIndexLength() +
-            stripe.getDataLength() + stripe.getFooterLength();
+        currentOffset += stripe.getLength();
       }
     }
     assertEquals(reader.getNumberOfRows(), rowCount);

Modified: hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Sep 12 12:08:42 2013
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -37,6 +38,7 @@ import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -631,6 +633,38 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
+  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+                                                final Path path,
+                                                final PathFilter filter
+                                               ) throws IOException {
+    return new Iterator<FileStatus>() {
+      private final FileStatus[] result = fs.listStatus(path, filter);
+      private int current = 0;
+
+      @Override
+      public boolean hasNext() {
+        return current < result.length;
+      }
+
+      @Override
+      public FileStatus next() {
+        return result[current++];
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalArgumentException("Not supported");
+      }
+    };
+  }
+
+  @Override
+  public BlockLocation[] getLocations(FileSystem fs,
+                                      FileStatus status) throws IOException {
+    return fs.getFileBlockLocations(status, 0, status.getLen());
+  }
+
+  @Override
   public boolean isSecurityEnabled() {
     return false;
   }

Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Sep 12 12:08:42 2013
@@ -21,11 +21,15 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobTracker;
@@ -330,4 +334,37 @@ public class Hadoop20SShims extends Hado
   public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException {
     return new WebHCatJTShim20S(conf, ugi);//this has state, so can't be cached
   }
+
+  @Override
+  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+                                                final Path path,
+                                                final PathFilter filter
+  ) throws IOException {
+    return new Iterator<FileStatus>() {
+      private final FileStatus[] result = fs.listStatus(path, filter);
+      private int current = 0;
+
+      @Override
+      public boolean hasNext() {
+        return current < result.length;
+      }
+
+      @Override
+      public FileStatus next() {
+        return result[current++];
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalArgumentException("Not supported");
+      }
+    };
+  }
+
+  @Override
+  public BlockLocation[] getLocations(FileSystem fs,
+                                      FileStatus status) throws IOException {
+    return fs.getFileBlockLocations(status, 0, status.getLen());
+  }
+
 }

Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Sep 12 12:08:42 2013
@@ -22,12 +22,18 @@ import java.lang.Integer;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
@@ -339,4 +345,61 @@ public class Hadoop23Shims extends Hadoo
   public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException {
     return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached
   }
+
+  @Override
+  public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+                                                final Path path,
+                                                final PathFilter filter
+  ) throws IOException {
+    return new Iterator<FileStatus>() {
+      private final RemoteIterator<LocatedFileStatus> inner =
+          fs.listLocatedStatus(path);
+      private FileStatus next;
+      {
+        if (inner.hasNext()) {
+          next = inner.next();
+        } else {
+          next = null;
+        }
+      }
+
+      @Override
+      public boolean hasNext() {
+        return next != null;
+      }
+
+      @Override
+      public FileStatus next() {
+        FileStatus result = next;
+        next = null;
+        try {
+          while (inner.hasNext() && next == null) {
+            next = inner.next();
+            if (filter != null && !filter.accept(next.getPath())) {
+              next = null;
+            }
+          }
+        } catch (IOException ioe) {
+          throw new IllegalArgumentException("Iterator exception", ioe);
+        }
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new IllegalArgumentException("Not supported");
+      }
+    };
+  }
+
+  @Override
+  public BlockLocation[] getLocations(FileSystem fs,
+                                      FileStatus status) throws IOException {
+    if (status instanceof LocatedFileStatus) {
+      return ((LocatedFileStatus) status).getBlockLocations();
+    } else {
+      return fs.getFileBlockLocations(status, 0, status.getLen());
+    }
+  }
+
 }

Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Sep 12 12:08:42 2013
@@ -25,6 +25,7 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.security.auth.login.LoginException;
@@ -32,6 +33,7 @@ import javax.security.auth.login.LoginEx
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -468,6 +470,28 @@ public interface HadoopShims {
         Class<RecordReader<K, V>> rrClass) throws IOException;
   }
 
+  /**
+   * Get the block locations for the given directory.
+   * @param fs the file system
+   * @param path the directory name to get the status and block locations
+   * @param filter a filter that needs to accept the file (or null)
+   * @return an iterator for the located file status objects
+   * @throws IOException
+   */
+  Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
+                                         PathFilter filter) throws IOException;
+
+  /**
+   * For file status returned by listLocatedStatus, convert them into a list
+   * of block locations.
+   * @param fs the file system
+   * @param status the file information
+   * @return the block locations of the file
+   * @throws IOException
+   */
+  BlockLocation[] getLocations(FileSystem fs,
+                               FileStatus status) throws IOException;
+
   public HCatHadoopShims getHCatShim();
   public interface HCatHadoopShims {