You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/18 00:22:11 UTC
[6/7] hive git commit: HIVE-11542 : port fileId support on shims and
splits from llap branch (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-11542 : port fileId support on shims and splits from llap branch (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3b6825b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3b6825b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3b6825b5
Branch: refs/heads/hbase-metastore
Commit: 3b6825b5b61e943e8e41743f5cbf6d640e0ebdf5
Parents: e059409
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Aug 17 15:16:57 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Aug 17 15:16:57 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 183 ++++++++++++++-----
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 97 +++++++---
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 25 ++-
.../hive/ql/txn/compactor/CompactorMR.java | 13 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 9 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 27 +--
.../hive/ql/io/orc/TestInputOutputFormat.java | 6 +-
.../hadoop/hive/shims/Hadoop20SShims.java | 11 ++
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 66 +++++++
.../apache/hadoop/hive/shims/HadoopShims.java | 15 ++
11 files changed, 348 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9a6781b..da171b1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1024,6 +1024,8 @@ public class HiveConf extends Configuration {
"data is read remotely (from the client or HS2 machine) and sent to all the tasks."),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,
"Max cache size for keeping meta info about orc splits cached in the client."),
+ HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
+ "Include file ID in splits on file systems thaty support it."),
HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,
"How many threads orc should use to create splits in parallel."),
HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false,
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c7e0780..30db513 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
@@ -221,7 +224,7 @@ public class AcidUtils {
* Get the list of original files.
* @return the list of original files (eg. 000000_0)
*/
- List<FileStatus> getOriginalFiles();
+ List<HdfsFileStatusWithId> getOriginalFiles();
/**
* Get the list of base and delta directories that are valid and not
@@ -423,6 +426,20 @@ public class AcidUtils {
return false;
}
+ @VisibleForTesting
+ public static Directory getAcidState(Path directory,
+ Configuration conf,
+ ValidTxnList txnList
+ ) throws IOException {
+ return getAcidState(directory, conf, txnList, false);
+ }
+
+ /** State class for getChildState; cannot modify 2 things in a method. */
+ private static class TxnBase {
+ private FileStatus status;
+ private long txn;
+ }
+
/**
* Get the ACID state of the given directory. It finds the minimal set of
* base and diff directories. Note that because major compactions don't
@@ -436,51 +453,40 @@ public class AcidUtils {
*/
public static Directory getAcidState(Path directory,
Configuration conf,
- ValidTxnList txnList
+ ValidTxnList txnList,
+ boolean useFileIds
) throws IOException {
FileSystem fs = directory.getFileSystem(conf);
- FileStatus bestBase = null;
- long bestBaseTxn = 0;
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
final List<FileStatus> obsolete = new ArrayList<FileStatus>();
- List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
- hiddenFileFilter);
- for(FileStatus child: children) {
- Path p = child.getPath();
- String fn = p.getName();
- if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
- long txn = parseBase(p);
- if (bestBase == null) {
- bestBase = child;
- bestBaseTxn = txn;
- } else if (bestBaseTxn < txn) {
- obsolete.add(bestBase);
- bestBase = child;
- bestBaseTxn = txn;
- } else {
- obsolete.add(child);
- }
- } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
- ParsedDelta delta = parseDelta(child);
- if (txnList.isTxnRangeValid(delta.minTransaction,
- delta.maxTransaction) !=
- ValidTxnList.RangeResponse.NONE) {
- working.add(delta);
- }
- } else {
- // This is just the directory. We need to recurse and find the actual files. But don't
- // do this until we have determined there is no base. This saves time. Plus,
- // it is possible that the cleaner is running and removing these original files,
- // in which case recursing through them could cause us to get an error.
- originalDirectories.add(child);
+ List<HdfsFileStatusWithId> childrenWithId = null;
+ if (useFileIds) {
+ try {
+ childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
+ } catch (Throwable t) {
+ LOG.error("Failed to get files with ID; using regular API", t);
+ useFileIds = false;
+ }
+ }
+ TxnBase bestBase = new TxnBase();
+ final List<HdfsFileStatusWithId> original = new ArrayList<>();
+ if (childrenWithId != null) {
+ for (HdfsFileStatusWithId child : childrenWithId) {
+ getChildState(child.getFileStatus(), child, txnList, working,
+ originalDirectories, original, obsolete, bestBase);
+ }
+ } else {
+ List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter);
+ for (FileStatus child : children) {
+ getChildState(
+ child, null, txnList, working, originalDirectories, original, obsolete, bestBase);
}
}
- final List<FileStatus> original = new ArrayList<FileStatus>();
- // if we have a base, the original files are obsolete.
- if (bestBase != null) {
+ // If we have a base, the original files are obsolete.
+ if (bestBase.status != null) {
// remove the entries so we don't get confused later and think we should
// use them.
original.clear();
@@ -488,12 +494,12 @@ public class AcidUtils {
// Okay, we're going to need these originals. Recurse through them and figure out what we
// really need.
for (FileStatus origDir : originalDirectories) {
- findOriginals(fs, origDir, original);
+ findOriginals(fs, origDir, original, useFileIds);
}
}
Collections.sort(working);
- long current = bestBaseTxn;
+ long current = bestBase.txn;
int lastStmtId = -1;
for(ParsedDelta next: working) {
if (next.maxTransaction > current) {
@@ -516,7 +522,7 @@ public class AcidUtils {
}
}
- final Path base = bestBase == null ? null : bestBase.getPath();
+ final Path base = bestBase.status == null ? null : bestBase.status.getPath();
LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
@@ -528,7 +534,7 @@ public class AcidUtils {
}
@Override
- public List<FileStatus> getOriginalFiles() {
+ public List<HdfsFileStatusWithId> getOriginalFiles() {
return original;
}
@@ -544,23 +550,100 @@ public class AcidUtils {
};
}
+ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
+ ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
+ List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase) {
+ Path p = child.getPath();
+ String fn = p.getName();
+ if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
+ long txn = parseBase(p);
+ if (bestBase.status == null) {
+ bestBase.status = child;
+ bestBase.txn = txn;
+ } else if (bestBase.txn < txn) {
+ obsolete.add(bestBase.status);
+ bestBase.status = child;
+ bestBase.txn = txn;
+ } else {
+ obsolete.add(child);
+ }
+ } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
+ ParsedDelta delta = parseDelta(child);
+ if (txnList.isTxnRangeValid(delta.minTransaction,
+ delta.maxTransaction) !=
+ ValidTxnList.RangeResponse.NONE) {
+ working.add(delta);
+ }
+ } else if (child.isDir()) {
+ // This is just the directory. We need to recurse and find the actual files. But don't
+ // do this until we have determined there is no base. This saves time. Plus,
+ // it is possible that the cleaner is running and removing these original files,
+ // in which case recursing through them could cause us to get an error.
+ originalDirectories.add(child);
+ } else {
+ original.add(createOriginalObj(childWithId, child));
+ }
+ }
+
+ public static HdfsFileStatusWithId createOriginalObj(
+ HdfsFileStatusWithId childWithId, FileStatus child) {
+ return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
+ }
+
+ private static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId {
+ private FileStatus fs;
+
+ public HdfsFileStatusWithoutId(FileStatus fs) {
+ this.fs = fs;
+ }
+
+ @Override
+ public FileStatus getFileStatus() {
+ return fs;
+ }
+
+ @Override
+ public Long getFileId() {
+ return null;
+ }
+ }
+
/**
- * Find the original files (non-ACID layout) recursively under the partition
- * directory.
+ * Find the original files (non-ACID layout) recursively under the partition directory.
* @param fs the file system
- * @param stat the file/directory to add
+ * @param stat the directory to add
* @param original the list of original files
* @throws IOException
*/
private static void findOriginals(FileSystem fs, FileStatus stat,
- List<FileStatus> original
- ) throws IOException {
- if (stat.isDir()) {
- for(FileStatus child: SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter)) {
- findOriginals(fs, child, original);
+ List<HdfsFileStatusWithId> original, boolean useFileIds) throws IOException {
+ assert stat.isDir();
+ List<HdfsFileStatusWithId> childrenWithId = null;
+ if (useFileIds) {
+ try {
+ childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
+ } catch (Throwable t) {
+ LOG.error("Failed to get files with ID; using regular API", t);
+ useFileIds = false;
+ }
+ }
+ if (childrenWithId != null) {
+ for (HdfsFileStatusWithId child : childrenWithId) {
+ if (child.getFileStatus().isDir()) {
+ findOriginals(fs, child.getFileStatus(), original, useFileIds);
+ } else {
+ original.add(child);
+ }
}
} else {
- original.add(stat);
+ List<FileStatus> children = SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter);
+ for (FileStatus child : children) {
+ if (child.isDir()) {
+ findOriginals(fs, child, original, useFileIds);
+ } else {
+ original.add(createOriginalObj(null, child));
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 6ed7872..fd6d2ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -48,12 +48,14 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -73,6 +76,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
@@ -436,26 +440,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class SplitInfo extends ACIDSplitStrategy {
private final Context context;
private final FileSystem fs;
- private final FileStatus file;
+ private final HdfsFileStatusWithId fileWithId;
private final FileInfo fileInfo;
private final boolean isOriginal;
private final List<DeltaMetaData> deltas;
private final boolean hasBase;
SplitInfo(Context context, FileSystem fs,
- FileStatus file, FileInfo fileInfo,
+ HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
boolean isOriginal,
List<DeltaMetaData> deltas,
boolean hasBase, Path dir, boolean[] covered) throws IOException {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fs = fs;
- this.file = file;
+ this.fileWithId = fileWithId;
this.fileInfo = fileInfo;
this.isOriginal = isOriginal;
this.deltas = deltas;
this.hasBase = hasBase;
}
+
+ @VisibleForTesting
+ public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, FileInfo fileInfo,
+ boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir,
+ boolean[] covered) throws IOException {
+ this(context, fs, AcidUtils.createOriginalObj(null, fileStatus),
+ fileInfo, isOriginal, deltas, hasBase, dir, covered);
+ }
}
/**
@@ -465,14 +477,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
Context context;
FileSystem fs;
- List<FileStatus> files;
+ List<HdfsFileStatusWithId> files;
boolean isOriginal;
List<DeltaMetaData> deltas;
Path dir;
boolean[] covered;
- public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
- boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
+ public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
+ List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
+ boolean[] covered) {
this.context = context;
this.dir = dir;
this.fs = fs;
@@ -516,14 +529,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public List<SplitInfo> getSplits() throws IOException {
List<SplitInfo> result = Lists.newArrayList();
- for (FileStatus file : files) {
+ for (HdfsFileStatusWithId file : files) {
FileInfo info = null;
if (context.cacheStripeDetails) {
- info = verifyCachedFileInfo(file);
+ info = verifyCachedFileInfo(file.getFileStatus());
}
// ignore files of 0 length
- if (file.getLen() > 0) {
- result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
+ if (file.getFileStatus().getLen() > 0) {
+ result.add(new SplitInfo(
+ context, fs, file, info, isOriginal, deltas, true, dir, covered));
}
}
return result;
@@ -540,7 +554,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* as opposed to query execution (split generation does not read or cache file footers).
*/
static final class BISplitStrategy extends ACIDSplitStrategy {
- List<FileStatus> fileStatuses;
+ List<HdfsFileStatusWithId> fileStatuses;
boolean isOriginal;
List<DeltaMetaData> deltas;
FileSystem fs;
@@ -548,7 +562,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Path dir;
public BISplitStrategy(Context context, FileSystem fs,
- Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
+ Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
List<DeltaMetaData> deltas, boolean[] covered) {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
@@ -562,11 +576,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public List<OrcSplit> getSplits() throws IOException {
List<OrcSplit> splits = Lists.newArrayList();
- for (FileStatus fileStatus : fileStatuses) {
+ for (HdfsFileStatusWithId file : fileStatuses) {
+ FileStatus fileStatus = file.getFileStatus();
String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
.getHosts();
- OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
- null, isOriginal, true, deltas, -1);
+ OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), file.getFileId(), 0,
+ fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1);
splits.add(orcSplit);
}
@@ -606,7 +621,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (!deltas.isEmpty()) {
for (int b = 0; b < numBuckets; ++b) {
if (!covered[b]) {
- splits.add(new OrcSplit(dir, b, 0, new String[0], null, false, false, deltas, -1));
+ splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1));
}
}
}
@@ -627,21 +642,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final Context context;
private final FileSystem fs;
private final Path dir;
+ private final boolean useFileIds;
- FileGenerator(Context context, FileSystem fs, Path dir) {
+ FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds) {
this.context = context;
this.fs = fs;
this.dir = dir;
+ this.useFileIds = useFileIds;
}
@Override
public SplitStrategy call() throws IOException {
final SplitStrategy splitStrategy;
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
- context.conf, context.transactionList);
+ context.conf, context.transactionList, useFileIds);
List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
Path base = dirInfo.getBaseDirectory();
- List<FileStatus> original = dirInfo.getOriginalFiles();
+ List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
boolean[] covered = new boolean[context.numBuckets];
boolean isOriginal = base == null;
@@ -649,17 +666,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (base != null || !original.isEmpty()) {
// find the base files (original or new style)
- List<FileStatus> children = original;
+ List<HdfsFileStatusWithId> children = original;
if (base != null) {
- children = SHIMS.listLocatedStatus(fs, base,
- AcidUtils.hiddenFileFilter);
+ children = findBaseFiles(base, useFileIds);
}
long totalFileSize = 0;
- for (FileStatus child : children) {
- totalFileSize += child.getLen();
+ for (HdfsFileStatusWithId child : children) {
+ totalFileSize += child.getFileStatus().getLen();
AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
- (child.getPath(), context.conf);
+ (child.getFileStatus().getPath(), context.conf);
int b = opts.getBucket();
// If the bucket is in the valid range, mark it as covered.
// I wish Hive actually enforced bucketing all of the time.
@@ -700,6 +716,24 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return splitStrategy;
}
+
+ private List<HdfsFileStatusWithId> findBaseFiles(
+ Path base, boolean useFileIds) throws IOException {
+ if (useFileIds) {
+ try {
+ return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
+ } catch (Throwable t) {
+ LOG.error("Failed to get files with ID; using regular API", t);
+ }
+ }
+ // Fall back to regular API and create states without ID.
+ List<FileStatus> children = SHIMS.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter);
+ List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
+ for (FileStatus child : children) {
+ result.add(AcidUtils.createOriginalObj(null, child));
+ }
+ return result;
+ }
}
/**
@@ -709,6 +743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
static final class SplitGenerator implements Callable<List<OrcSplit>> {
private final Context context;
private final FileSystem fs;
+ private final HdfsFileStatusWithId fileWithId;
private final FileStatus file;
private final long blockSize;
private final TreeMap<Long, BlockLocation> locations;
@@ -728,8 +763,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public SplitGenerator(SplitInfo splitInfo) throws IOException {
this.context = splitInfo.context;
this.fs = splitInfo.fs;
- this.file = splitInfo.file;
- this.blockSize = file.getBlockSize();
+ this.fileWithId = splitInfo.fileWithId;
+ this.file = this.fileWithId.getFileStatus();
+ this.blockSize = this.file.getBlockSize();
this.fileInfo = splitInfo.fileInfo;
locations = SHIMS.getLocationsWithOffset(fs, file);
this.isOriginal = splitInfo.isOriginal;
@@ -837,8 +873,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
final double splitRatio = (double) length / (double) fileLen;
final long scaledProjSize = projColsUncompressedSize > 0 ?
(long) (splitRatio * projColsUncompressedSize) : fileLen;
- return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo,
- isOriginal, hasBase, deltas, scaledProjSize);
+ return new OrcSplit(file.getPath(), fileWithId.getFileId(), offset, length, hosts,
+ fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
}
/**
@@ -1020,9 +1056,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
List<Future<?>> splitFutures = Lists.newArrayList();
// multi-threaded file statuses and split strategy
+ boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
for (Path dir : getInputPaths(conf)) {
FileSystem fs = dir.getFileSystem(conf);
- FileGenerator fileGenerator = new FileGenerator(context, fs, dir);
+ FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
pathFutures.add(context.threadPool.submit(fileGenerator));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 8cf4cc0..cc03df7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,6 +41,8 @@ import org.apache.hadoop.mapred.FileSplit;
*
*/
public class OrcSplit extends FileSplit {
+ private static final Log LOG = LogFactory.getLog(OrcSplit.class);
+
private ReaderImpl.FileMetaInfo fileMetaInfo;
private boolean hasFooter;
private boolean isOriginal;
@@ -46,7 +50,9 @@ public class OrcSplit extends FileSplit {
private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
private OrcFile.WriterVersion writerVersion;
private long projColsUncompressedSize;
+ private transient Long fileId;
+ static final int HAS_FILEID_FLAG = 8;
static final int BASE_FLAG = 4;
static final int ORIGINAL_FLAG = 2;
static final int FOOTER_FLAG = 1;
@@ -58,10 +64,13 @@ public class OrcSplit extends FileSplit {
super(null, 0, 0, (String[]) null);
}
- public OrcSplit(Path path, long offset, long length, String[] hosts,
+ public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts,
ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
super(path, offset, length, hosts);
+ // We could avoid serializing file ID and just replace the path with inode-based path.
+ // However, that breaks bunch of stuff because Hive later looks up things by split path.
+ this.fileId = fileId;
this.fileMetaInfo = fileMetaInfo;
hasFooter = this.fileMetaInfo != null;
this.isOriginal = isOriginal;
@@ -77,7 +86,8 @@ public class OrcSplit extends FileSplit {
int flags = (hasBase ? BASE_FLAG : 0) |
(isOriginal ? ORIGINAL_FLAG : 0) |
- (hasFooter ? FOOTER_FLAG : 0);
+ (hasFooter ? FOOTER_FLAG : 0) |
+ (fileId != null ? HAS_FILEID_FLAG : 0);
out.writeByte(flags);
out.writeInt(deltas.size());
for(AcidInputFormat.DeltaMetaData delta: deltas) {
@@ -99,6 +109,9 @@ public class OrcSplit extends FileSplit {
footerBuff.limit() - footerBuff.position());
WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
}
+ if (fileId != null) {
+ out.writeLong(fileId.longValue());
+ }
}
@Override
@@ -110,6 +123,7 @@ public class OrcSplit extends FileSplit {
hasFooter = (FOOTER_FLAG & flags) != 0;
isOriginal = (ORIGINAL_FLAG & flags) != 0;
hasBase = (BASE_FLAG & flags) != 0;
+ boolean hasFileId = (HAS_FILEID_FLAG & flags) != 0;
deltas.clear();
int numDeltas = in.readInt();
@@ -134,6 +148,9 @@ public class OrcSplit extends FileSplit {
fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
metadataSize, footerBuff, writerVersion);
}
+ if (hasFileId) {
+ fileId = in.readLong();
+ }
}
ReaderImpl.FileMetaInfo getFileMetaInfo(){
@@ -159,4 +176,8 @@ public class OrcSplit extends FileSplit {
public long getProjectedColumnsUncompressedSize() {
return projColsUncompressedSize;
}
+
+ public Long getFileId() {
+ return fileId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 8e431b2..02fa725 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -133,7 +134,8 @@ public class CompactorMR {
// and discovering that in getSplits is too late as we then have no way to pass it to our
// mapper.
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(
+ new Path(sd.getLocation()), conf, txns, false);
StringableList dirsToSearch = new StringableList();
Path baseDir = null;
if (isMajor) {
@@ -141,12 +143,13 @@ public class CompactorMR {
// partition is just now being converted to ACID.
baseDir = dir.getBaseDirectory();
if (baseDir == null) {
- List<FileStatus> originalFiles = dir.getOriginalFiles();
+ List<HdfsFileStatusWithId> originalFiles = dir.getOriginalFiles();
if (!(originalFiles == null) && !(originalFiles.size() == 0)) {
// There are original format files
- for (FileStatus stat : originalFiles) {
- dirsToSearch.add(stat.getPath());
- LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search");
+ for (HdfsFileStatusWithId stat : originalFiles) {
+ Path path = stat.getFileStatus().getPath();
+ dirsToSearch.add(path);
+ LOG.debug("Adding original file " + path + " to dirs to search");
}
// Set base to the location so that the input format reads the original files.
baseDir = new Path(sd.getLocation());
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 73715c6..9bf725d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -223,7 +224,7 @@ public class Initiator extends CompactorThread {
boolean noBase = false;
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false);
Path base = dir.getBaseDirectory();
long baseSize = 0;
FileStatus stat = null;
@@ -236,9 +237,9 @@ public class Initiator extends CompactorThread {
baseSize = sumDirSize(fs, base);
}
- List<FileStatus> originals = dir.getOriginalFiles();
- for (FileStatus origStat : originals) {
- baseSize += origStat.getLen();
+ List<HdfsFileStatusWithId> originals = dir.getOriginalFiles();
+ for (HdfsFileStatusWithId origStat : originals) {
+ baseSize += origStat.getFileStatus().getLen();
}
long deltaSize = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f8ded12..b6ba862 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.junit.Test;
import java.util.List;
@@ -102,13 +103,14 @@ public class TestAcidUtils {
assertEquals(null, dir.getBaseDirectory());
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
- List<FileStatus> result = dir.getOriginalFiles();
+ List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
assertEquals(5, result.size());
- assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString());
- assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString());
- assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString());
+ assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/subdir/000000_0",
+ result.get(4).getFileStatus().getPath().toString());
}
@Test
@@ -136,13 +138,14 @@ public class TestAcidUtils {
obsolete.get(0).getPath().toString());
assertEquals("mock:/tbl/part1/delta_029_029",
obsolete.get(1).getPath().toString());
- List<FileStatus> result = dir.getOriginalFiles();
+ List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
assertEquals(5, result.size());
- assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString());
- assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString());
- assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString());
- assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString());
- assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString());
+ assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+ assertEquals("mock:/tbl/part1/subdir/000000_0",
+ result.get(4).getFileStatus().getPath().toString());
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
assertEquals(2, deltas.size());
AcidUtils.ParsedDelta delt = deltas.get(0);
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 0c12c89..547e799 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -483,7 +483,7 @@ public class TestInputOutputFormat {
final OrcInputFormat.Context context = new OrcInputFormat.Context(
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
- context, fs, new MockPath(fs, "mock:/a/b"));
+ context, fs, new MockPath(fs, "mock:/a/b"), false);
final SplitStrategy splitStrategy = gen.call();
assertTrue(
String.format(
@@ -507,7 +507,7 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/b/part-04", 1000, new byte[0]));
OrcInputFormat.FileGenerator gen =
new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"));
+ new MockPath(fs, "mock:/a/b"), false);
SplitStrategy splitStrategy = gen.call();
assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
@@ -520,7 +520,7 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
gen = new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"));
+ new MockPath(fs, "mock:/a/b"), false);
splitStrategy = gen.call();
assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
----------------------------------------------------------------------
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index ffffcb7..a56309f 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -722,4 +722,15 @@ public class Hadoop20SShims extends HadoopShimsSecure {
Token<?> fsToken = fs.getDelegationToken(uname);
cred.addToken(fsToken.getService(), fsToken);
}
+
+ @Override
+ public List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+ FileSystem fs, Path path, PathFilter filter) throws IOException {
+ throw new UnsupportedOperationException("Not supported on old version");
+ }
+
+ @Override
+ public long getFileId(FileSystem fs, String path) throws IOException {
+ throw new UnsupportedOperationException("Not supported on old version");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 9eae0ac..e5be8d6 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -61,10 +61,13 @@ import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.io.LongWritable;
@@ -662,6 +665,64 @@ public class Hadoop23Shims extends HadoopShimsSecure {
return result;
}
+ private static final class HdfsFileStatusWithIdImpl implements HdfsFileStatusWithId {
+ private final LocatedFileStatus lfs;
+ private final long fileId;
+
+ public HdfsFileStatusWithIdImpl(LocatedFileStatus lfs, long fileId) {
+ this.lfs = lfs;
+ this.fileId = fileId;
+ }
+
+ @Override
+ public FileStatus getFileStatus() {
+ return lfs;
+ }
+
+ @Override
+ public Long getFileId() {
+ return fileId;
+ }
+ }
+
+ @Override
+ public List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+ FileSystem fs, Path p, PathFilter filter) throws IOException {
+ DistributedFileSystem dfs = ensureDfs(fs);
+ DFSClient dfsc = dfs.getClient();
+ final String src = p.toUri().getPath();
+ DirectoryListing current = dfsc.listPaths(src,
+ org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true);
+ if (current == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+ final URI fsUri = fs.getUri();
+ List<HdfsFileStatusWithId> result = new ArrayList<HdfsFileStatusWithId>(
+ current.getPartialListing().length);
+ while (current != null) {
+ org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = current.getPartialListing();
+ for (int i = 0; i < hfss.length; ++i) {
+ HdfsLocatedFileStatus next = (HdfsLocatedFileStatus)(hfss[i]);
+ if (filter != null) {
+ Path filterPath = next.getFullPath(p).makeQualified(fsUri, null);
+ if (!filter.accept(filterPath)) continue;
+ }
+ LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p);
+ result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId()));
+ }
+ current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null;
+ }
+ return result;
+ }
+
+ private DistributedFileSystem ensureDfs(FileSystem fs) {
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass());
+ }
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ return dfs;
+ }
+
@Override
public BlockLocation[] getLocations(FileSystem fs,
FileStatus status) throws IOException {
@@ -1352,4 +1413,9 @@ public class Hadoop23Shims extends HadoopShimsSecure {
// Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS.
fs.addDelegationTokens(uname, cred);
}
+
+ @Override
+ public long getFileId(FileSystem fs, String path) throws IOException {
+ return ensureDfs(fs).getClient().getFileInfo(path).getFileId();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 74785e5..2b6f322 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -256,6 +256,10 @@ public interface HadoopShims {
List<FileStatus> listLocatedStatus(FileSystem fs, Path path,
PathFilter filter) throws IOException;
+
+ List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+ FileSystem fs, Path path, PathFilter filter) throws IOException;
+
/**
* For file status returned by listLocatedStatus, convert them into a list
* of block locations.
@@ -316,6 +320,11 @@ public interface HadoopShims {
public void debugLog();
}
+ public interface HdfsFileStatusWithId {
+ public FileStatus getFileStatus();
+ public Long getFileId();
+ }
+
public HCatHadoopShims getHCatShim();
public interface HCatHadoopShims {
@@ -731,4 +740,10 @@ public interface HadoopShims {
* @throws IOException If an error occurred on adding the token.
*/
public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException;
+
+ /**
+ * Gets file ID. Only supported on hadoop-2.
+ * @return inode ID of the file.
+ */
+ long getFileId(FileSystem fs, String path) throws IOException;
}