You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/24 07:25:52 UTC
svn commit: r1535281 - in /hive/branches/tez:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/java/org/apache/hadoop/hive/ql/log/
Author: gunther
Date: Thu Oct 24 05:25:51 2013
New Revision: 1535281
URL: http://svn.apache.org/r1535281
Log:
HIVE-5639: Allow caching of Orc footers in Tez AM (Siddharth Seth via Gunther Hagleitner)
Modified:
hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
Modified: hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Oct 24 05:25:51 2013
@@ -516,6 +516,10 @@ public class HiveConf extends Configurat
HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
+ HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
+ HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
+ HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
+
HIVESKEWJOIN("hive.optimize.skewjoin", false),
HIVECONVERTJOIN("hive.auto.convert.join", true),
HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Oct 24 05:25:51 2013
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -58,6 +60,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A MapReduce/Hive input format for ORC files.
*/
@@ -69,6 +75,7 @@ public class OrcInputFormat implements
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
static final String MAX_SPLIT_SIZE = "mapred.max.split.size";
+
private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -248,11 +255,15 @@ public class OrcInputFormat implements
//If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
reader = OrcFile.createReader(fs, path);
} else {
- //We have OrcSplit, which has footer metadata cached, so used the appropriate reader
+ //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
//constructor
OrcSplit orcSplit = (OrcSplit) fSplit;
- FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
- reader = OrcFile.createReader(fs, path, fMetaInfo);
+ if (orcSplit.hasFooter()) {
+ FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
+ reader = OrcFile.createReader(fs, path, fMetaInfo);
+ } else {
+ reader = OrcFile.createReader(fs, path);
+ }
}
return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength());
}
@@ -311,13 +322,17 @@ public class OrcInputFormat implements
* the different worker threads.
*/
static class Context {
- private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
+ private static Cache<Path, FileInfo> footerCache;
+ private final ExecutorService threadPool;
private final List<OrcSplit> splits = new ArrayList<OrcSplit>(10000);
private final List<Throwable> errors = new ArrayList<Throwable>();
private final HadoopShims shims = ShimLoader.getHadoopShims();
- private final Configuration conf;
private final long maxSize;
private final long minSize;
+ private final boolean footerInSplits;
+ private final boolean cacheStripeDetails;
+ private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
+ private final AtomicInteger numFilesCounter = new AtomicInteger(0);
/**
* A count of the number of threads that may create more work for the
@@ -326,9 +341,24 @@ public class OrcInputFormat implements
private int schedulers = 0;
Context(Configuration conf) {
- this.conf = conf;
minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
+ footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+ int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
+ ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
+ int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+
+ cacheStripeDetails = (cacheStripeDetailsSize > 0);
+
+ threadPool = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ORC_GET_SPLITS #%d").build());
+
+ synchronized (Context.class) {
+ if (footerCache == null && cacheStripeDetails) {
+ footerCache = CacheBuilder.newBuilder().concurrencyLevel(numThreads)
+ .initialCapacity(cacheStripeDetailsSize).softValues().build();
+ }
+ }
}
int getSchedulers() {
@@ -413,16 +443,22 @@ public class OrcInputFormat implements
@Override
public void run() {
try {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS);
Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
hiddenFileFilter);
while (itr.hasNext()) {
FileStatus file = itr.next();
if (!file.isDir()) {
- context.schedule(new SplitGenerator(context, fs, file));
+ FileInfo fileInfo = null;
+ if (context.cacheStripeDetails) {
+ fileInfo = verifyCachedFileInfo(file);
+ }
+ context.schedule(new SplitGenerator(context, fs, file, fileInfo));
}
}
// mark the fact that we are done
context.decrementSchedulers();
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_BLOCK_LOCATIONS);
} catch (Throwable th) {
context.decrementSchedulers();
synchronized (context.errors) {
@@ -430,6 +466,34 @@ public class OrcInputFormat implements
}
}
}
+
+ private FileInfo verifyCachedFileInfo(FileStatus file) {
+ context.numFilesCounter.incrementAndGet();
+ FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
+ if (fileInfo != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Info cached for path: " + file.getPath());
+ }
+ if (fileInfo.modificationTime == file.getModificationTime() && fileInfo.size == file.getLen()) {
+ // Cached copy is valid
+ context.cacheHitCounter.incrementAndGet();
+ return fileInfo;
+ } else {
+ // Invalidate
+ Context.footerCache.invalidate(file.getPath());
+ LOG.info("Meta-Info for : " + file.getPath() + " changed. CachedModificationTime: "
+ + fileInfo.modificationTime + ", CurrentModificationTime: "
+ + file.getModificationTime()
+ + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + file.getLen());
+ }
+ } else {
+ LOG.info("Info not cached for path: " + file.getPath());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Info not cached for path: " + file.getPath());
+ }
+ }
+ return null;
+ }
}
/**
@@ -442,13 +506,18 @@ public class OrcInputFormat implements
private final FileStatus file;
private final long blockSize;
private final BlockLocation[] locations;
+ private final FileInfo fileInfo;
+ private Iterable<StripeInformation> stripes;
+ private FileMetaInfo fileMetaInfo;
+
SplitGenerator(Context context, FileSystem fs,
- FileStatus file) throws IOException {
+ FileStatus file, FileInfo fileInfo) throws IOException {
this.context = context;
this.fs = fs;
this.file = file;
this.blockSize = file.getBlockSize();
+ this.fileInfo = fileInfo;
locations = context.shims.getLocations(fs, file);
}
@@ -547,15 +616,15 @@ public class OrcInputFormat implements
public void run() {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS);
try {
- Reader orcReader = OrcFile.createReader(fs, file.getPath());
+ populateAndCacheStripeDetails();
long currentOffset = -1;
long currentLength = 0;
- for(StripeInformation stripe: orcReader.getStripes()) {
+ for(StripeInformation stripe: stripes) {
// if we are working on a stripe, over the min stripe size, and
// crossed a block boundary, cut the input split here.
if (currentOffset != -1 && currentLength > context.minSize &&
(currentOffset / blockSize != stripe.getOffset() / blockSize)) {
- createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+ createSplit(currentOffset, currentLength, fileMetaInfo);
currentOffset = -1;
}
// if we aren't building a split, start a new one.
@@ -566,12 +635,12 @@ public class OrcInputFormat implements
currentLength += stripe.getLength();
}
if (currentLength >= context.maxSize) {
- createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+ createSplit(currentOffset, currentLength, fileMetaInfo);
currentOffset = -1;
}
}
if (currentOffset != -1) {
- createSplit(currentOffset, currentLength, orcReader.getFileMetaInfo());
+ createSplit(currentOffset, currentLength, fileMetaInfo);
}
} catch (Throwable th) {
synchronized (context.errors) {
@@ -580,12 +649,48 @@ public class OrcInputFormat implements
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CREATE_ORC_SPLITS);
}
+
+
+
+
+ private void populateAndCacheStripeDetails() {
+ try {
+ Reader orcReader;
+ boolean found = false;
+ if (fileInfo != null) {
+ found = true;
+ stripes = fileInfo.stripeInfos;
+ fileMetaInfo = fileInfo.fileMetaInfo;
+ // For multiple runs, in case sendSplitsInFooter changes
+ if (fileMetaInfo == null && context.footerInSplits) {
+ orcReader = OrcFile.createReader(fs, file.getPath());
+ fileInfo.fileMetaInfo = orcReader.getFileMetaInfo();
+ }
+ }
+ if (!found) {
+ orcReader = OrcFile.createReader(fs, file.getPath());
+ stripes = orcReader.getStripes();
+ fileMetaInfo = context.footerInSplits ? orcReader.getFileMetaInfo() : null;
+ if (context.cacheStripeDetails) {
+ // Populate into cache.
+ Context.footerCache.put(file.getPath(),
+ new FileInfo(file.getModificationTime(), file.getLen(), stripes, fileMetaInfo));
+ }
+ }
+ } catch (Throwable th) {
+ synchronized (context.errors) {
+ context.errors.add(th);
+ }
+ }
+ }
+
}
@Override
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
// use threads to resolve directories into splits
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
Context context = new Context(job);
for(Path dir: getInputPaths(job)) {
FileSystem fs = dir.getFileSystem(job);
@@ -607,6 +712,32 @@ public class OrcInputFormat implements
}
InputSplit[] result = new InputSplit[context.splits.size()];
context.splits.toArray(result);
+ if (context.cacheStripeDetails) {
+ LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
+ + context.numFilesCounter.get());
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
return result;
}
-}
+
+ /**
+ * FileInfo.
+ *
+ * Stores information relevant to split generation for an ORC File.
+ *
+ */
+ private static class FileInfo {
+ long modificationTime;
+ long size;
+ Iterable<StripeInformation> stripeInfos;
+ FileMetaInfo fileMetaInfo;
+
+ FileInfo(long modificationTime, long size, Iterable<StripeInformation> stripeInfos,
+ FileMetaInfo fileMetaInfo) {
+ this.modificationTime = modificationTime;
+ this.size = size;
+ this.stripeInfos = stripeInfos;
+ this.fileMetaInfo = fileMetaInfo;
+ }
+ }
+}
\ No newline at end of file
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Thu Oct 24 05:25:51 2013
@@ -19,6 +19,7 @@ import org.apache.hadoop.mapred.FileSpli
*/
public class OrcSplit extends FileSplit {
private Reader.FileMetaInfo fileMetaInfo;
+ private boolean hasFooter;
protected OrcSplit(){
//The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
@@ -31,6 +32,7 @@ public class OrcSplit extends FileSplit
FileMetaInfo fileMetaInfo) {
super(path, offset, length, hosts);
this.fileMetaInfo = fileMetaInfo;
+ hasFooter = this.fileMetaInfo != null;
}
@Override
@@ -38,16 +40,22 @@ public class OrcSplit extends FileSplit
//serialize path, offset, length using FileSplit
super.write(out);
- //serialize FileMetaInfo fields
- Text.writeString(out, fileMetaInfo.compressionType);
- WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
-
- //serialize FileMetaInfo field footer
- ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
- footerBuff.reset();
- //write length of buffer
- WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
- out.write(footerBuff.array(), footerBuff.position(), footerBuff.limit() - footerBuff.position());
+ // Whether footer information follows.
+ out.writeBoolean(hasFooter);
+
+ if (hasFooter) {
+ // serialize FileMetaInfo fields
+ Text.writeString(out, fileMetaInfo.compressionType);
+ WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+
+ // serialize FileMetaInfo field footer
+ ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
+ footerBuff.reset();
+ // write length of buffer
+ WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
+ out.write(footerBuff.array(), footerBuff.position(),
+ footerBuff.limit() - footerBuff.position());
+ }
}
@Override
@@ -55,20 +63,27 @@ public class OrcSplit extends FileSplit
//deserialize path, offset, length using FileSplit
super.readFields(in);
- //deserialize FileMetaInfo fields
- String compressionType = Text.readString(in);
- int bufferSize = WritableUtils.readVInt(in);
-
- //deserialize FileMetaInfo field footer
- int footerBuffSize = WritableUtils.readVInt(in);
- ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
- in.readFully(footerBuff.array(), 0, footerBuffSize);
+ hasFooter = in.readBoolean();
+
+ if (hasFooter) {
+ // deserialize FileMetaInfo fields
+ String compressionType = Text.readString(in);
+ int bufferSize = WritableUtils.readVInt(in);
+
+ // deserialize FileMetaInfo field footer
+ int footerBuffSize = WritableUtils.readVInt(in);
+ ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
+ in.readFully(footerBuff.array(), 0, footerBuffSize);
- fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+ fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+ }
}
public FileMetaInfo getFileMetaInfo(){
return fileMetaInfo;
}
+ public boolean hasFooter() {
+ return hasFooter;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Thu Oct 24 05:25:51 2013
@@ -153,11 +153,15 @@ public class VectorizedOrcInputFormat ex
//If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
reader = OrcFile.createReader(fs, path);
} else {
- //We have OrcSplit, which has footer metadata cached, so used the appropriate reader
+ //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
//constructor
OrcSplit orcSplit = (OrcSplit) fSplit;
- FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
- reader = OrcFile.createReader(fs, path, fMetaInfo);
+ if (orcSplit.hasFooter()) {
+ FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
+ reader = OrcFile.createReader(fs, path, fMetaInfo);
+ } else {
+ reader = OrcFile.createReader(fs, path);
+ }
}
return new VectorizedOrcRecordReader(reader, conf, fSplit);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1535281&r1=1535280&r2=1535281&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Thu Oct 24 05:25:51 2013
@@ -65,6 +65,8 @@ public class PerfLogger {
public static final String LOAD_HASHTABLE = "LoadHashtable";
public static final String INIT_ORC_RECORD_READER = "OrcRecordReaderInit";
public static final String CREATE_ORC_SPLITS = "OrcCreateSplits";
+ public static final String ORC_GET_SPLITS = "OrcGetSplits";
+ public static final String ORC_GET_BLOCK_LOCATIONS = "OrcGetBlockLocations";
protected static final ThreadLocal<PerfLogger> perfLogger = new ThreadLocal<PerfLogger>();