You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 14:08:43 UTC
svn commit: r1522541 - in /hive/trunk:
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/io/orc/
shims/src/0.20/java/org/apache/hadoop/hive/shims/
shims/src/0.20S/java/org/apache/hadoop/hive/shims/ shims/src/0.23/java/...
Author: hashutosh
Date: Thu Sep 12 12:08:42 2013
New Revision: 1522541
URL: http://svn.apache.org/r1522541
Log:
HIVE-5102 : ORC getSplits should create splits based the stripes (Owen Omalley via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Sep 12 12:08:42 2013
@@ -18,41 +18,64 @@
package org.apache.hadoop.hive.ql.io.orc;
-import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* A MapReduce/Hive input format for ORC files.
*/
-public class OrcInputFormat extends FileInputFormat<NullWritable, OrcStruct>
- implements InputFormatChecker {
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
+ InputFormatChecker {
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+ static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
+ static final String MAX_SPLIT_SIZE = "mapred.max.split.size";
+ private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
+ private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
+
+ /**
+ * When picking the hosts for a split that crosses block boundaries,
+ * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the
+ * number of bytes available on the host with the most.
+ * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the
+ * split will contain host2 (100% of host2) and host3 (90% of host2). Host1
+ * with 50% will be dropped.
+ */
+ private static final double MIN_INCLUDED_LOCATION = 0.80;
private static class OrcRecordReader
implements RecordReader<NullWritable, OrcStruct> {
@@ -137,10 +160,12 @@ public class OrcInputFormat extends Fil
}
}
- public OrcInputFormat() {
- // just set a really small lower bound
- setMinSplitSize(16 * 1024);
- }
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
/**
* Recurse down into a type subtree turning on all of the sub-columns.
@@ -148,7 +173,7 @@ public class OrcInputFormat extends Fil
* @param result the global view of columns that should be included
* @param typeId the root of tree to enable
*/
- private static void includeColumnRecursive(List<OrcProto.Type> types,
+ static void includeColumnRecursive(List<OrcProto.Type> types,
boolean[] result,
int typeId) {
result[typeId] = true;
@@ -165,7 +190,7 @@ public class OrcInputFormat extends Fil
* @param conf the configuration
* @return true for each column that should be included
*/
- private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
+ static boolean[] findIncludedColumns(List<OrcProto.Type> types,
Configuration conf) {
String includedStr =
conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
@@ -220,4 +245,324 @@ public class OrcInputFormat extends Fil
}
return true;
}
+
+ /**
+ * Get the list of input {@link Path}s for the map-reduce job.
+ *
+ * @param conf The configuration of the job
+ * @return the list of input {@link Path}s for the map-reduce job.
+ */
+ static Path[] getInputPaths(JobConf conf) throws IOException {
+ String dirs = conf.get("mapred.input.dir");
+ if (dirs == null) {
+ throw new IOException("Configuration mapred.input.dir is not defined.");
+ }
+ String [] list = StringUtils.split(dirs);
+ Path[] result = new Path[list.length];
+ for (int i = 0; i < list.length; i++) {
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
+ }
+ return result;
+ }
+
+ /**
+ * The global information about the split generation that we pass around to
+ * the different worker threads.
+ */
+ static class Context {
+ private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
+ private final List<FileSplit> splits = new ArrayList<FileSplit>(10000);
+ private final List<Throwable> errors = new ArrayList<Throwable>();
+ private final HadoopShims shims = ShimLoader.getHadoopShims();
+ private final Configuration conf;
+ private final long maxSize;
+ private final long minSize;
+
+ /**
+ * A count of the number of threads that may create more work for the
+ * thread pool.
+ */
+ private int schedulers = 0;
+
+ Context(Configuration conf) {
+ this.conf = conf;
+ minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
+ maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
+ }
+
+ int getSchedulers() {
+ return schedulers;
+ }
+
+ /**
+ * Get the Nth split.
+ * @param index if index >= 0, count from the front, otherwise count from
+ * the back.
+ * @result the Nth file split
+ */
+ FileSplit getResult(int index) {
+ if (index >= 0) {
+ return splits.get(index);
+ } else {
+ return splits.get(splits.size() + index);
+ }
+ }
+
+ List<Throwable> getErrors() {
+ return errors;
+ }
+
+ /**
+ * Add a unit of work.
+ * @param runnable the object to run
+ */
+ synchronized void schedule(Runnable runnable) {
+ if (runnable instanceof FileGenerator) {
+ schedulers += 1;
+ }
+ threadPool.execute(runnable);
+ }
+
+ /**
+ * Mark a worker that may generate more work as done.
+ */
+ synchronized void decrementSchedulers() {
+ schedulers -= 1;
+ if (schedulers == 0) {
+ notify();
+ }
+ }
+
+ /**
+ * Wait until all of the tasks are done. It waits until all of the
+ * threads that may create more work are done and then shuts down the
+ * thread pool and waits for the final threads to finish.
+ */
+ synchronized void waitForTasks() {
+ try {
+ while (schedulers != 0) {
+ wait();
+ }
+ threadPool.shutdown();
+ threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ } catch (InterruptedException ie) {
+ throw new IllegalStateException("interrupted", ie);
+ }
+ }
+ }
+
+ /**
+ * Given a directory, get the list of files and blocks in those files.
+ * A thread is used for each directory.
+ */
+ static final class FileGenerator implements Runnable {
+ private final Context context;
+ private final FileSystem fs;
+ private final Path dir;
+
+ FileGenerator(Context context, FileSystem fs, Path dir) {
+ this.context = context;
+ this.fs = fs;
+ this.dir = dir;
+ }
+
+ /**
+ * For each path, get the list of files and blocks that they consist of.
+ */
+ @Override
+ public void run() {
+ try {
+ Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
+ hiddenFileFilter);
+ while (itr.hasNext()) {
+ FileStatus file = itr.next();
+ if (!file.isDir()) {
+ context.schedule(new SplitGenerator(context, fs, file));
+ }
+ }
+ // mark the fact that we are done
+ context.decrementSchedulers();
+ } catch (Throwable th) {
+ context.decrementSchedulers();
+ synchronized (context.errors) {
+ context.errors.add(th);
+ }
+ }
+ }
+ }
+
+ /**
+ * Split the stripes of a given file into input splits.
+ * A thread is used for each file.
+ */
+ static final class SplitGenerator implements Runnable {
+ private final Context context;
+ private final FileSystem fs;
+ private final FileStatus file;
+ private final long blockSize;
+ private final BlockLocation[] locations;
+
+ SplitGenerator(Context context, FileSystem fs,
+ FileStatus file) throws IOException {
+ this.context = context;
+ this.fs = fs;
+ this.file = file;
+ this.blockSize = file.getBlockSize();
+ locations = context.shims.getLocations(fs, file);
+ }
+
+ Path getPath() {
+ return file.getPath();
+ }
+
+ @Override
+ public String toString() {
+ return "splitter(" + file.getPath() + ")";
+ }
+
+ /**
+ * Compute the number of bytes that overlap between the two ranges.
+ * @param offset1 start of range1
+ * @param length1 length of range1
+ * @param offset2 start of range2
+ * @param length2 length of range2
+ * @return the number of bytes in the overlap range
+ */
+ static long getOverlap(long offset1, long length1,
+ long offset2, long length2) {
+ long end1 = offset1 + length1;
+ long end2 = offset2 + length2;
+ if (end2 <= offset1 || end1 <= offset2) {
+ return 0;
+ } else {
+ return Math.min(end1, end2) - Math.max(offset1, offset2);
+ }
+ }
+
+ /**
+ * Create an input split over the given range of bytes. The location of the
+ * split is based on where the majority of the byte are coming from. ORC
+ * files are unlikely to have splits that cross between blocks because they
+ * are written with large block sizes.
+ * @param offset the start of the split
+ * @param length the length of the split
+ * @throws IOException
+ */
+ void createSplit(long offset, long length) throws IOException {
+ String[] hosts;
+ if ((offset % blockSize) + length <= blockSize) {
+ // handle the single block case
+ hosts = locations[(int) (offset / blockSize)].getHosts();
+ } else {
+ // Calculate the number of bytes in the split that are local to each
+ // host.
+ Map<String, LongWritable> sizes = new HashMap<String, LongWritable>();
+ long maxSize = 0;
+ for(BlockLocation block: locations) {
+ long overlap = getOverlap(offset, length, block.getOffset(),
+ block.getLength());
+ if (overlap > 0) {
+ for(String host: block.getHosts()) {
+ LongWritable val = sizes.get(host);
+ if (val == null) {
+ val = new LongWritable();
+ sizes.put(host, val);
+ }
+ val.set(val.get() + overlap);
+ maxSize = Math.max(maxSize, val.get());
+ }
+ }
+ }
+ // filter the list of locations to those that have at least 80% of the
+ // max
+ long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION);
+ List<String> hostList = new ArrayList<String>();
+ // build the locations in a predictable order to simplify testing
+ for(BlockLocation block: locations) {
+ for(String host: block.getHosts()) {
+ if (sizes.containsKey(host)) {
+ if (sizes.get(host).get() >= threshold) {
+ hostList.add(host);
+ }
+ sizes.remove(host);
+ }
+ }
+ }
+ hosts = new String[hostList.size()];
+ hostList.toArray(hosts);
+ }
+ synchronized (context.splits) {
+ context.splits.add(new FileSplit(file.getPath(), offset, length,
+ hosts));
+ }
+ }
+
+ /**
+ * Divide the adjacent stripes in the file into input splits based on the
+ * block size and the configured minimum and maximum sizes.
+ */
+ @Override
+ public void run() {
+ try {
+ Reader orcReader = OrcFile.createReader(fs, file.getPath());
+ long currentOffset = -1;
+ long currentLength = 0;
+ for(StripeInformation stripe: orcReader.getStripes()) {
+ // if we are working on a stripe, over the min stripe size, and
+ // crossed a block boundary, cut the input split here.
+ if (currentOffset != -1 && currentLength > context.minSize &&
+ (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
+ createSplit(currentOffset, currentLength);
+ currentOffset = -1;
+ }
+ // if we aren't building a split, start a new one.
+ if (currentOffset == -1) {
+ currentOffset = stripe.getOffset();
+ currentLength = stripe.getLength();
+ } else {
+ currentLength += stripe.getLength();
+ }
+ if (currentLength >= context.maxSize) {
+ createSplit(currentOffset, currentLength);
+ currentOffset = -1;
+ }
+ }
+ if (currentOffset != -1) {
+ createSplit(currentOffset, currentLength);
+ }
+ } catch (Throwable th) {
+ synchronized (context.errors) {
+ context.errors.add(th);
+ }
+ }
+ }
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job,
+ int numSplits) throws IOException {
+ // use threads to resolve directories into splits
+ Context context = new Context(job);
+ for(Path dir: getInputPaths(job)) {
+ FileSystem fs = dir.getFileSystem(job);
+ context.schedule(new FileGenerator(context, fs, dir));
+ }
+ context.waitForTasks();
+ // deal with exceptions
+ if (!context.errors.isEmpty()) {
+ List<IOException> errors =
+ new ArrayList<IOException>(context.errors.size());
+ for(Throwable th: context.errors) {
+ if (th instanceof IOException) {
+ errors.add((IOException) th);
+ } else {
+ throw new IOException("serious problem", th);
+ }
+ }
+ throw new InvalidInputException(errors);
+ }
+ InputSplit[] result = new InputSplit[context.splits.size()];
+ context.splits.toArray(result);
+ return result;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Sep 12 12:08:42 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
@@ -63,6 +64,11 @@ final class ReaderImpl implements Reader
}
@Override
+ public long getLength() {
+ return stripe.getDataLength() + getIndexLength() + getFooterLength();
+ }
+
+ @Override
public long getDataLength() {
return stripe.getDataLength();
}
@@ -269,7 +275,7 @@ final class ReaderImpl implements Reader
ByteBuffer buffer = ByteBuffer.allocate(readSize);
file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
- int psLen = buffer.get(readSize - 1);
+ int psLen = buffer.get(readSize - 1) & 0xff;
ensureOrcFooter(file, path, psLen, buffer);
int psOffset = readSize - 1 - psLen;
CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StripeInformation.java Thu Sep 12 12:08:42 2013
@@ -28,6 +28,12 @@ public interface StripeInformation {
long getOffset();
/**
+ * Get the total length of the stripe in bytes.
+ * @return the number of bytes in the stripe
+ */
+ long getLength();
+
+ /**
* Get the length of the stripe's indexes.
* @return the number of bytes in the index
*/
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Thu Sep 12 12:08:42 2013
@@ -17,29 +17,38 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -47,11 +56,17 @@ import org.junit.rules.TestName;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class TestInputOutputFormat {
@@ -93,6 +108,409 @@ public class TestInputOutputFormat {
}
@Test
+ public void testOverlap() throws Exception {
+ assertEquals(0, OrcInputFormat.SplitGenerator.getOverlap(100, 100,
+ 200, 100));
+ assertEquals(0, OrcInputFormat.SplitGenerator.getOverlap(0, 1000,
+ 2000, 100));
+ assertEquals(100, OrcInputFormat.SplitGenerator.getOverlap(1000, 1000,
+ 1500, 100));
+ assertEquals(250, OrcInputFormat.SplitGenerator.getOverlap(1000, 250,
+ 500, 2000));
+ assertEquals(100, OrcInputFormat.SplitGenerator.getOverlap(1000, 1000,
+ 1900, 1000));
+ assertEquals(500, OrcInputFormat.SplitGenerator.getOverlap(2000, 1000,
+ 2500, 2000));
+ }
+
+ @Test
+ public void testGetInputPaths() throws Exception {
+ conf.set("mapred.input.dir", "a,b,c");
+ assertArrayEquals(new Path[]{new Path("a"), new Path("b"), new Path("c")},
+ OrcInputFormat.getInputPaths(conf));
+ conf.set("mapred.input.dir", "/a/b/c/d/e");
+ assertArrayEquals(new Path[]{new Path("/a/b/c/d/e")},
+ OrcInputFormat.getInputPaths(conf));
+ conf.set("mapred.input.dir", "/a/b/c\\,d,/e/f\\,g/h");
+ assertArrayEquals(new Path[]{new Path("/a/b/c,d"), new Path("/e/f,g/h")},
+ OrcInputFormat.getInputPaths(conf));
+ }
+
+ static class TestContext extends OrcInputFormat.Context {
+ List<Runnable> queue = new ArrayList<Runnable>();
+
+ TestContext(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public void schedule(Runnable runnable) {
+ queue.add(runnable);
+ }
+ }
+
+ @Test
+ public void testFileGenerator() throws Exception {
+ TestContext context = new TestContext(conf);
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("/a/b/part-00", 1000, new byte[0]),
+ new MockFile("/a/b/part-01", 1000, new byte[0]),
+ new MockFile("/a/b/_part-02", 1000, new byte[0]),
+ new MockFile("/a/b/.part-03", 1000, new byte[0]),
+ new MockFile("/a/b/part-04", 1000, new byte[0]));
+ OrcInputFormat.FileGenerator gen =
+ new OrcInputFormat.FileGenerator(context, fs, new Path("/a/b"));
+ gen.run();
+ if (context.getErrors().size() > 0) {
+ for(Throwable th: context.getErrors()) {
+ System.out.println(StringUtils.stringifyException(th));
+ }
+ throw new IOException("Errors during file generation");
+ }
+ assertEquals(-1, context.getSchedulers());
+ assertEquals(3, context.queue.size());
+ assertEquals(new Path("/a/b/part-00"),
+ ((OrcInputFormat.SplitGenerator) context.queue.get(0)).getPath());
+ assertEquals(new Path("/a/b/part-01"),
+ ((OrcInputFormat.SplitGenerator) context.queue.get(1)).getPath());
+ assertEquals(new Path("/a/b/part-04"),
+ ((OrcInputFormat.SplitGenerator) context.queue.get(2)).getPath());
+ }
+
+ static final Charset UTF8 = Charset.forName("UTF-8");
+
+ static class MockBlock {
+ int offset;
+ int length;
+ final String[] hosts;
+
+ MockBlock(String... hosts) {
+ this.hosts = hosts;
+ }
+ }
+
+ static class MockFile {
+ final Path path;
+ final int blockSize;
+ final int length;
+ final MockBlock[] blocks;
+ final byte[] content;
+
+ MockFile(String path, int blockSize, byte[] content, MockBlock... blocks) {
+ this.path = new Path(path);
+ this.blockSize = blockSize;
+ this.blocks = blocks;
+ this.content = content;
+ this.length = content.length;
+ int offset = 0;
+ for(MockBlock block: blocks) {
+ block.offset = offset;
+ block.length = Math.min(length - offset, blockSize);
+ offset += block.length;
+ }
+ }
+ }
+
+ static class MockInputStream extends FSInputStream {
+ final MockFile file;
+ int offset = 0;
+
+ public MockInputStream(MockFile file) throws IOException {
+ this.file = file;
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ this.offset = (int) offset;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset;
+ }
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (offset < file.length) {
+ return file.content[offset++] & 0xff;
+ }
+ return -1;
+ }
+ }
+
+ static class MockFileSystem extends FileSystem {
+ final MockFile[] files;
+ Path workingDir = new Path("/");
+
+ MockFileSystem(Configuration conf, MockFile... files) {
+ setConf(conf);
+ this.files = files;
+ }
+
+ @Override
+ public URI getUri() {
+ try {
+ return new URI("mock:///");
+ } catch (URISyntaxException err) {
+ throw new IllegalArgumentException("huh?", err);
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ for(MockFile file: files) {
+ if (file.path.equals(path)) {
+ return new FSDataInputStream(new MockInputStream(file));
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
+ boolean b, int i, short i2, long l,
+ Progressable progressable
+ ) throws IOException {
+ throw new UnsupportedOperationException("no writes");
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i,
+ Progressable progressable
+ ) throws IOException {
+ throw new UnsupportedOperationException("no writes");
+ }
+
+ @Override
+ public boolean rename(Path path, Path path2) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for(MockFile file: files) {
+ if (file.path.getParent().equals(path)) {
+ result.add(getFileStatus(file.path));
+ }
+ }
+ return result.toArray(new FileStatus[result.size()]);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+ workingDir = path;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ for(MockFile file: files) {
+ if (file.path.equals(path)) {
+ return new FileStatus(file.length, false, 1, file.blockSize, 0, 0,
+ FsPermission.createImmutable((short) 644), "owen", "group",
+ file.path);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus stat,
+ long start, long len) {
+ List<BlockLocation> result = new ArrayList<BlockLocation>();
+ for(MockFile file: files) {
+ if (file.path.equals(stat.getPath())) {
+ for(MockBlock block: file.blocks) {
+ if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
+ block.length, start, len) > 0) {
+ result.add(new BlockLocation(block.hosts, block.hosts,
+ block.offset, block.length));
+ }
+ }
+ return result.toArray(new BlockLocation[result.size()]);
+ }
+ }
+ return new BlockLocation[0];
+ }
+ }
+
+ static void fill(DataOutputBuffer out, long length) throws IOException {
+ for(int i=0; i < length; ++i) {
+ out.write(0);
+ }
+ }
+
+ /**
+ * Create the binary contents of an ORC file that just has enough information
+ * to test the getInputSplits.
+ * @param stripeLengths the length of each stripe
+ * @return the bytes of the file
+ * @throws IOException
+ */
+ static byte[] createMockOrcFile(long... stripeLengths) throws IOException {
+ OrcProto.Footer.Builder footer = OrcProto.Footer.newBuilder();
+ final long headerLen = 3;
+ long offset = headerLen;
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ for(long stripeLength: stripeLengths) {
+ footer.addStripes(OrcProto.StripeInformation.newBuilder()
+ .setOffset(offset)
+ .setIndexLength(0)
+ .setDataLength(stripeLength-10)
+ .setFooterLength(10)
+ .setNumberOfRows(1000));
+ offset += stripeLength;
+ }
+ fill(buffer, offset);
+ footer.addTypes(OrcProto.Type.newBuilder()
+ .setKind(OrcProto.Type.Kind.STRUCT)
+ .addFieldNames("col1")
+ .addSubtypes(1));
+ footer.addTypes(OrcProto.Type.newBuilder()
+ .setKind(OrcProto.Type.Kind.STRING));
+ footer.setNumberOfRows(1000 * stripeLengths.length)
+ .setHeaderLength(headerLen)
+ .setContentLength(offset - headerLen);
+ footer.build().writeTo(buffer);
+ int footerEnd = buffer.getLength();
+ OrcProto.PostScript ps =
+ OrcProto.PostScript.newBuilder()
+ .setCompression(OrcProto.CompressionKind.NONE)
+ .setFooterLength(footerEnd - offset)
+ .setMagic("ORC")
+ .build();
+ ps.writeTo(buffer);
+ buffer.write(buffer.getLength() - footerEnd);
+ byte[] result = new byte[buffer.getLength()];
+ System.arraycopy(buffer.getData(), 0, result, 0, buffer.getLength());
+ return result;
+ }
+
+ @Test
+ public void testAddSplit() throws Exception {
+ // create a file with 5 blocks spread around the cluster
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("/a/file", 500,
+ createMockOrcFile(197, 300, 600, 200, 200, 100, 100, 100, 100, 100),
+ new MockBlock("host1-1", "host1-2", "host1-3"),
+ new MockBlock("host2-1", "host0", "host2-3"),
+ new MockBlock("host0", "host3-2", "host3-3"),
+ new MockBlock("host4-1", "host4-2", "host4-3"),
+ new MockBlock("host5-1", "host5-2", "host5-3")));
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+ OrcInputFormat.SplitGenerator splitter =
+ new OrcInputFormat.SplitGenerator(context, fs,
+ fs.getFileStatus(new Path("/a/file")));
+ splitter.createSplit(0, 200);
+ FileSplit result = context.getResult(-1);
+ assertEquals(0, result.getStart());
+ assertEquals(200, result.getLength());
+ assertEquals("/a/file", result.getPath().toString());
+ String[] locs = result.getLocations();
+ assertEquals(3, locs.length);
+ assertEquals("host1-1", locs[0]);
+ assertEquals("host1-2", locs[1]);
+ assertEquals("host1-3", locs[2]);
+ splitter.createSplit(500, 600);
+ result = context.getResult(-1);
+ locs = result.getLocations();
+ assertEquals(3, locs.length);
+ assertEquals("host2-1", locs[0]);
+ assertEquals("host0", locs[1]);
+ assertEquals("host2-3", locs[2]);
+ splitter.createSplit(0, 2500);
+ result = context.getResult(-1);
+ locs = result.getLocations();
+ assertEquals(1, locs.length);
+ assertEquals("host0", locs[0]);
+ }
+
+ @Test
+ public void testSplitGenerator() throws Exception {
+ // create a file with 5 blocks spread around the cluster
+ long[] stripeSizes =
+ new long[]{197, 300, 600, 200, 200, 100, 100, 100, 100, 100};
+ MockFileSystem fs = new MockFileSystem(conf,
+ new MockFile("/a/file", 500,
+ createMockOrcFile(stripeSizes),
+ new MockBlock("host1-1", "host1-2", "host1-3"),
+ new MockBlock("host2-1", "host0", "host2-3"),
+ new MockBlock("host0", "host3-2", "host3-3"),
+ new MockBlock("host4-1", "host4-2", "host4-3"),
+ new MockBlock("host5-1", "host5-2", "host5-3")));
+ conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 300);
+ conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 200);
+ OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
+ OrcInputFormat.SplitGenerator splitter =
+ new OrcInputFormat.SplitGenerator(context, fs,
+ fs.getFileStatus(new Path("/a/file")));
+ splitter.run();
+ if (context.getErrors().size() > 0) {
+ for(Throwable th: context.getErrors()) {
+ System.out.println(StringUtils.stringifyException(th));
+ }
+ throw new IOException("Errors during splitting");
+ }
+ FileSplit result = context.getResult(0);
+ assertEquals(3, result.getStart());
+ assertEquals(497, result.getLength());
+ result = context.getResult(1);
+ assertEquals(500, result.getStart());
+ assertEquals(600, result.getLength());
+ result = context.getResult(2);
+ assertEquals(1100, result.getStart());
+ assertEquals(400, result.getLength());
+ result = context.getResult(3);
+ assertEquals(1500, result.getStart());
+ assertEquals(300, result.getLength());
+ result = context.getResult(4);
+ assertEquals(1800, result.getStart());
+ assertEquals(200, result.getLength());
+ // test min = 0, max = 0 generates each stripe
+ conf.setInt(OrcInputFormat.MIN_SPLIT_SIZE, 0);
+ conf.setInt(OrcInputFormat.MAX_SPLIT_SIZE, 0);
+ context = new OrcInputFormat.Context(conf);
+ splitter = new OrcInputFormat.SplitGenerator(context, fs,
+ fs.getFileStatus(new Path("/a/file")));
+ splitter.run();
+ if (context.getErrors().size() > 0) {
+ for(Throwable th: context.getErrors()) {
+ System.out.println(StringUtils.stringifyException(th));
+ }
+ throw new IOException("Errors during splitting");
+ }
+ for(int i=0; i < stripeSizes.length; ++i) {
+ assertEquals("checking stripe " + i + " size",
+ stripeSizes[i], context.getResult(i).getLength());
+ }
+ }
+
+ @Test
public void testInOutFormat() throws Exception {
Properties properties = new Properties();
StructObjectInspector inspector;
@@ -144,7 +562,7 @@ public class TestInputOutputFormat {
IntObjectInspector intInspector =
(IntObjectInspector) fields.get(0).getFieldObjectInspector();
assertEquals(0.0, reader.getProgress(), 0.00001);
- assertEquals(0, reader.getPos());
+ assertEquals(3, reader.getPos());
while (reader.next(key, value)) {
assertEquals(++rowNum, intInspector.get(inspector.
getStructFieldData(serde.deserialize(value), fields.get(0))));
@@ -279,18 +697,7 @@ public class TestInputOutputFormat {
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
- assertEquals(1, splits.length);
-
- // read the whole file
- conf.set("hive.io.file.readcolumn.ids", "0,1");
- org.apache.hadoop.mapred.RecordReader reader =
- in.getRecordReader(splits[0], conf, Reporter.NULL);
- Object key = reader.createKey();
- Object value = reader.createValue();
- assertEquals(0.0, reader.getProgress(), 0.00001);
- assertEquals(0, reader.getPos());
- assertEquals(false, reader.next(key, value));
- reader.close();
+ assertEquals(0, splits.length);
assertEquals(null, serde.getSerDeStats());
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Thu Sep 12 12:08:42 2013
@@ -1019,12 +1019,10 @@ public class TestOrcFile {
stripeCount += 1;
rowCount += stripe.getNumberOfRows();
if (currentOffset < 0) {
- currentOffset = stripe.getOffset() + stripe.getIndexLength() +
- stripe.getDataLength() + stripe.getFooterLength();
+ currentOffset = stripe.getOffset() + stripe.getLength();
} else {
assertEquals(currentOffset, stripe.getOffset());
- currentOffset += stripe.getIndexLength() +
- stripe.getDataLength() + stripe.getFooterLength();
+ currentOffset += stripe.getLength();
}
}
assertEquals(reader.getNumberOfRows(), rowCount);
Modified: hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Sep 12 12:08:42 2013
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -37,6 +38,7 @@ import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -631,6 +633,38 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+ final Path path,
+ final PathFilter filter
+ ) throws IOException {
+ return new Iterator<FileStatus>() {
+ private final FileStatus[] result = fs.listStatus(path, filter);
+ private int current = 0;
+
+ @Override
+ public boolean hasNext() {
+ return current < result.length;
+ }
+
+ @Override
+ public FileStatus next() {
+ return result[current++];
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalArgumentException("Not supported");
+ }
+ };
+ }
+
+ @Override
+ public BlockLocation[] getLocations(FileSystem fs,
+ FileStatus status) throws IOException {
+ return fs.getFileBlockLocations(status, 0, status.getLen());
+ }
+
+ @Override
public boolean isSecurityEnabled() {
return false;
}
Modified: hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Sep 12 12:08:42 2013
@@ -21,11 +21,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker;
@@ -330,4 +334,37 @@ public class Hadoop20SShims extends Hado
public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException {
return new WebHCatJTShim20S(conf, ugi);//this has state, so can't be cached
}
+
+ @Override
+ public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+ final Path path,
+ final PathFilter filter
+ ) throws IOException {
+ return new Iterator<FileStatus>() {
+ private final FileStatus[] result = fs.listStatus(path, filter);
+ private int current = 0;
+
+ @Override
+ public boolean hasNext() {
+ return current < result.length;
+ }
+
+ @Override
+ public FileStatus next() {
+ return result[current++];
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalArgumentException("Not supported");
+ }
+ };
+ }
+
+ @Override
+ public BlockLocation[] getLocations(FileSystem fs,
+ FileStatus status) throws IOException {
+ return fs.getFileBlockLocations(status, 0, status.getLen());
+ }
+
}
Modified: hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Sep 12 12:08:42 2013
@@ -22,12 +22,18 @@ import java.lang.Integer;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -339,4 +345,61 @@ public class Hadoop23Shims extends Hadoo
public WebHCatJTShim getWebHCatShim(Configuration conf, UserGroupInformation ugi) throws IOException {
return new WebHCatJTShim23(conf, ugi);//this has state, so can't be cached
}
+
+ @Override
+ public Iterator<FileStatus> listLocatedStatus(final FileSystem fs,
+ final Path path,
+ final PathFilter filter
+ ) throws IOException {
+ return new Iterator<FileStatus>() {
+ private final RemoteIterator<LocatedFileStatus> inner =
+ fs.listLocatedStatus(path);
+ private FileStatus next;
+ {
+ if (inner.hasNext()) {
+ next = inner.next();
+ } else {
+ next = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public FileStatus next() {
+ FileStatus result = next;
+ next = null;
+ try {
+ while (inner.hasNext() && next == null) {
+ next = inner.next();
+ if (filter != null && !filter.accept(next.getPath())) {
+ next = null;
+ }
+ }
+ } catch (IOException ioe) {
+ throw new IllegalArgumentException("Iterator exception", ioe);
+ }
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new IllegalArgumentException("Not supported");
+ }
+ };
+ }
+
+ @Override
+ public BlockLocation[] getLocations(FileSystem fs,
+ FileStatus status) throws IOException {
+ if (status instanceof LocatedFileStatus) {
+ return ((LocatedFileStatus) status).getBlockLocations();
+ } else {
+ return fs.getFileBlockLocations(status, 0, status.getLen());
+ }
+ }
+
}
Modified: hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1522541&r1=1522540&r2=1522541&view=diff
==============================================================================
--- hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Sep 12 12:08:42 2013
@@ -25,6 +25,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
import java.util.List;
import javax.security.auth.login.LoginException;
@@ -32,6 +33,7 @@ import javax.security.auth.login.LoginEx
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -468,6 +470,28 @@ public interface HadoopShims {
Class<RecordReader<K, V>> rrClass) throws IOException;
}
+ /**
+ * Get the block locations for the given directory.
+ * @param fs the file system
+ * @param path the directory name to get the status and block locations
+ * @param filter a filter that needs to accept the file (or null)
+ * @return an iterator for the located file status objects
+ * @throws IOException
+ */
+ Iterator<FileStatus> listLocatedStatus(FileSystem fs, Path path,
+ PathFilter filter) throws IOException;
+
+ /**
+ * For file status returned by listLocatedStatus, convert them into a list
+ * of block locations.
+ * @param fs the file system
+ * @param status the file information
+ * @return the block locations of the file
+ * @throws IOException
+ */
+ BlockLocation[] getLocations(FileSystem fs,
+ FileStatus status) throws IOException;
+
public HCatHadoopShims getHCatShim();
public interface HCatHadoopShims {