You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/16 02:47:02 UTC
[1/2] hive git commit: HIVE-11675 : make use of file footer PPD API
in ETL strategy or separate strategy (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 26b5c7b56 -> 868db42a6
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 80208c2..6d27f55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectType;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -3485,7 +3486,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
public Iterable<Map.Entry<Long, ByteBuffer>> getFileMetadata(
- List<Long> fileIds, Configuration conf) throws HiveException {
+ List<Long> fileIds) throws HiveException {
try {
return getMSC().getFileMetadata(fileIds);
} catch (TException e) {
@@ -3493,6 +3494,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
+ public Iterable<Map.Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(
+ List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException {
+ try {
+ return getMSC().getFileMetadataBySarg(fileIds, sarg, doGetFooters);
+ } catch (TException e) {
+ throw new HiveException(e);
+ }
+ }
+
public void clearFileMetadata(List<Long> fileIds) throws HiveException {
try {
getMSC().clearFileMetadata(fileIds);
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index f824e18..1a64f3a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -65,12 +65,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -1113,7 +1113,7 @@ public class TestInputOutputFormat {
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
OrcInputFormat.SplitGenerator splitter =
new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
- AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
+ fs.getFileStatus(new Path("/a/file")), null, true,
new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null, true);
OrcSplit result = splitter.createSplit(0, 200, null);
assertEquals(0, result.getStart());
@@ -1154,7 +1154,7 @@ public class TestInputOutputFormat {
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
OrcInputFormat.SplitGenerator splitter =
new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
- AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
+ fs.getFileStatus(new Path("/a/file")), null, true,
new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null, true);
List<OrcSplit> results = splitter.call();
OrcSplit result = results.get(0);
@@ -1177,7 +1177,7 @@ public class TestInputOutputFormat {
HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 0);
context = new OrcInputFormat.Context(conf);
splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
- AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
+ fs.getFileStatus(new Path("/a/file")), null, true,
new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null, true);
results = splitter.call();
for(int i=0; i < stripeSizes.length; ++i) {
@@ -2165,7 +2165,7 @@ public class TestInputOutputFormat {
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- OrcInputFormat.generateSplitsInfo(conf, -1);
+ OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1, null));
return null;
}
});
@@ -2184,7 +2184,7 @@ public class TestInputOutputFormat {
}
assertEquals(1, OrcInputFormat.Context.getCurrentThreadPoolSize());
FileInputFormat.setInputPaths(conf, "mock:/ugi/2");
- List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, -1);
+ List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, new Context(conf, -1, null));
assertEquals(1, splits.size());
} finally {
MockFileSystem.clearGlobalFiles();
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
index 7a93b54..62a0ab0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java
@@ -18,18 +18,34 @@
package org.apache.hadoop.hive.ql.io.orc;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.hbase.MetadataStore;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionExpressionForMetastore;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -43,6 +59,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -50,6 +67,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@@ -76,7 +95,7 @@ public class TestOrcSplitElimination {
JobConf conf;
FileSystem fs;
- Path testFilePath;
+ Path testFilePath, testFilePath2;
@Rule
public TestName testCaseName = new TestName();
@@ -94,17 +113,15 @@ public class TestOrcSplitElimination {
fs = FileSystem.getLocal(conf);
testFilePath = new Path(workDir, "TestOrcFile." +
testCaseName.getMethodName() + ".orc");
+ testFilePath2 = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".2.orc");
fs.delete(testFilePath, false);
+ fs.delete(testFilePath2, false);
}
@Test
public void testSplitEliminationSmallMaxSplit() throws Exception {
- ObjectInspector inspector;
- synchronized (TestOrcFile.class) {
- inspector = ObjectInspectorFactory
- .getReflectionObjectInspector(AllTypesRow.class,
- ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- }
+ ObjectInspector inspector = createIO();
Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
100000, CompressionKind.NONE, 10000, 10000);
writeData(writer);
@@ -116,13 +133,10 @@ public class TestOrcSplitElimination {
GenericUDF udf = new GenericUDFOPEqualOrLessThan();
List<ExprNodeDesc> childExpr = Lists.newArrayList();
- ExprNodeColumnDesc col = new ExprNodeColumnDesc(Long.class, "userid", "T", false);
- ExprNodeConstantDesc con = new ExprNodeConstantDesc(100);
- childExpr.add(col);
- childExpr.add(con);
- ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- String sargStr = SerializationUtilities.serializeExpression(en);
- conf.set("hive.io.filter.expr.serialized", sargStr);
+ ExprNodeConstantDesc con;
+ ExprNodeGenericFuncDesc en;
+ String sargStr;
+ createTestSarg(inspector, udf, childExpr);
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(5, splits.length);
@@ -177,12 +191,7 @@ public class TestOrcSplitElimination {
@Test
public void testSplitEliminationLargeMaxSplit() throws Exception {
- ObjectInspector inspector;
- synchronized (TestOrcFile.class) {
- inspector = ObjectInspectorFactory
- .getReflectionObjectInspector(AllTypesRow.class,
- ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- }
+ ObjectInspector inspector = createIO();
Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
100000, CompressionKind.NONE, 10000, 10000);
writeData(writer);
@@ -194,13 +203,10 @@ public class TestOrcSplitElimination {
GenericUDF udf = new GenericUDFOPEqualOrLessThan();
List<ExprNodeDesc> childExpr = Lists.newArrayList();
- ExprNodeColumnDesc col = new ExprNodeColumnDesc(Long.class, "userid", "T", false);
- ExprNodeConstantDesc con = new ExprNodeConstantDesc(100);
- childExpr.add(col);
- childExpr.add(con);
- ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr);
- String sargStr = SerializationUtilities.serializeExpression(en);
- conf.set("hive.io.filter.expr.serialized", sargStr);
+ ExprNodeConstantDesc con;
+ ExprNodeGenericFuncDesc en;
+ String sargStr;
+ createTestSarg(inspector, udf, childExpr);
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(2, splits.length);
@@ -266,12 +272,7 @@ public class TestOrcSplitElimination {
@Test
public void testSplitEliminationComplexExpr() throws Exception {
- ObjectInspector inspector;
- synchronized (TestOrcFile.class) {
- inspector = ObjectInspectorFactory
- .getReflectionObjectInspector(AllTypesRow.class,
- ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- }
+ ObjectInspector inspector = createIO();
Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
100000, CompressionKind.NONE, 10000, 10000);
writeData(writer);
@@ -385,6 +386,342 @@ public class TestOrcSplitElimination {
assertEquals(1, splits.length);
}
+ private static class OrcInputFormatForTest extends OrcInputFormat {
+ public static void clearLocalCache() {
+ OrcInputFormat.Context.clearLocalCache();
+ }
+ static MockExternalCaches caches = new MockExternalCaches();
+ @Override
+ protected ExternalFooterCachesByConf createExternalCaches() {
+ return caches;
+ }
+ }
+
+ private static class MockExternalCaches
+ implements ExternalFooterCachesByConf, ExternalFooterCachesByConf.Cache, MetadataStore {
+ private static class MockItem {
+ ByteBuffer data;
+ ByteBuffer[] extraCols;
+ ByteBuffer[] extraData;
+
+ @Override
+ public String toString() {
+ return (data == null ? 0 : data.remaining()) + " bytes"
+ + (extraCols == null ? "" : ("; " + extraCols.length + " extras"));
+ }
+ }
+ private final Map<Long, MockItem> cache = new ConcurrentHashMap<>();
+ private final OrcFileMetadataHandler handler = new OrcFileMetadataHandler();
+ private final AtomicInteger putCount = new AtomicInteger(0),
+ getCount = new AtomicInteger(0), getHitCount = new AtomicInteger(0),
+ getByExprCount = new AtomicInteger(0), getHitByExprCount = new AtomicInteger();
+
+ public void resetCounts() {
+ getByExprCount.set(0);
+ getCount.set(0);
+ putCount.set(0);
+ getHitCount.set(0);
+ getHitByExprCount.set(0);
+ }
+
+ @Override
+ public Cache getCache(HiveConf conf) throws IOException {
+ handler.configure(conf, new PartitionExpressionForMetastore(), this);
+ return this;
+ }
+
+ @Override
+ public Iterator<Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(
+ List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException {
+ getByExprCount.incrementAndGet();
+ ByteBuffer[] metadatas = new ByteBuffer[fileIds.size()];
+ ByteBuffer[] ppdResults = new ByteBuffer[fileIds.size()];
+ boolean[] eliminated = new boolean[fileIds.size()];
+ try {
+ byte[] bb = new byte[sarg.remaining()];
+ System.arraycopy(sarg.array(), sarg.arrayOffset(), bb, 0, sarg.remaining());
+ handler.getFileMetadataByExpr(fileIds, bb, metadatas, ppdResults, eliminated);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ Map<Long, MetadataPpdResult> result = new HashMap<>();
+ for (int i = 0; i < metadatas.length; ++i) {
+ long fileId = fileIds.get(i);
+ ByteBuffer metadata = metadatas[i];
+ if (metadata == null) continue;
+ getHitByExprCount.incrementAndGet();
+ metadata = eliminated[i] ? null : metadata;
+ MetadataPpdResult mpr = new MetadataPpdResult();
+ ByteBuffer bitset = eliminated[i] ? null : ppdResults[i];
+ mpr.setMetadata(doGetFooters ? metadata : null);
+ mpr.setIncludeBitset(bitset);
+ result.put(fileId, mpr);
+ }
+ return result.entrySet().iterator();
+ }
+
+ @Override
+ public void clearFileMetadata(List<Long> fileIds) throws HiveException {
+ for (Long id : fileIds) {
+ cache.remove(id);
+ }
+ }
+
+ @Override
+ public Iterator<Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds)
+ throws HiveException {
+ getCount.incrementAndGet();
+ HashMap<Long, ByteBuffer> result = new HashMap<>();
+ for (Long id : fileIds) {
+ MockItem mi = cache.get(id);
+ if (mi == null) continue;
+ getHitCount.incrementAndGet();
+ result.put(id, mi.data);
+ }
+ return result.entrySet().iterator();
+ }
+
+ @Override
+ public void putFileMetadata(ArrayList<Long> fileIds,
+ ArrayList<ByteBuffer> values) throws HiveException {
+ putCount.incrementAndGet();
+ ByteBuffer[] addedCols = handler.createAddedCols();
+ ByteBuffer[][] addedVals = null;
+ if (addedCols != null) {
+ addedVals = handler.createAddedColVals(values);
+ }
+ try {
+ storeFileMetadata(fileIds, values, addedCols, addedVals);
+ } catch (IOException | InterruptedException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ // MetadataStore
+ @Override
+ public void getFileMetadata(List<Long> fileIds, ByteBuffer[] result) throws IOException {
+ for (int i = 0; i < fileIds.size(); ++i) {
+ MockItem mi = cache.get(fileIds.get(i));
+ result[i] = (mi == null ? null : mi.data);
+ }
+ }
+
+ @Override
+ public void storeFileMetadata(List<Long> fileIds, List<ByteBuffer> metadataBuffers,
+ ByteBuffer[] addedCols, ByteBuffer[][] addedVals)
+ throws IOException, InterruptedException {
+ for (int i = 0; i < fileIds.size(); ++i) {
+ ByteBuffer value = (metadataBuffers != null) ? metadataBuffers.get(i) : null;
+ ByteBuffer[] av = addedVals == null ? null : addedVals[i];
+ storeFileMetadata(fileIds.get(i), value, addedCols, av);
+ }
+ }
+
+ @Override
+ public void storeFileMetadata(long fileId, ByteBuffer metadata,
+ ByteBuffer[] addedCols, ByteBuffer[] addedVals) throws IOException, InterruptedException {
+ if (metadata == null) {
+ cache.remove(metadata);
+ return;
+ }
+ MockItem mi = new MockItem();
+ mi.data = metadata;
+ if (addedVals != null) {
+ mi.extraCols = addedCols;
+ mi.extraData = addedVals;
+ }
+ cache.put(fileId, mi);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestOrcSplitElimination.class);
+
+ @Test
+ public void testExternalFooterCache() throws Exception {
+ testFooterExternalCacheImpl(false);
+ }
+
+ @Test
+ public void testExternalFooterCachePpd() throws Exception {
+ testFooterExternalCacheImpl(true);
+ }
+
+ private final static class FsWithHash {
+ private FileSplit fs;
+ public FsWithHash(FileSplit fs) {
+ this.fs = fs;
+ }
+ @Override
+ public int hashCode() {
+ if (fs == null) return 0;
+ final int prime = 31;
+ int result = prime * 1 + fs.getPath().hashCode();
+ result = prime * result + Long.valueOf(fs.getStart()).hashCode();
+ return prime * result + Long.valueOf(fs.getLength()).hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof FsWithHash)) return false;
+ FsWithHash other = (FsWithHash)obj;
+ if ((fs == null) != (other.fs == null)) return false;
+ if (fs == null && other.fs == null) return true;
+ return fs.getStart() == other.fs.getStart() && fs.getLength() == other.fs.getLength()
+ && fs.getPath().equals(other.fs.getPath());
+ }
+ }
+
+ private void testFooterExternalCacheImpl(boolean isPpd) throws IOException {
+ ObjectInspector inspector = createIO();
+ writeFile(inspector, testFilePath);
+ writeFile(inspector, testFilePath2);
+
+ GenericUDF udf = new GenericUDFOPEqualOrLessThan();
+ List<ExprNodeDesc> childExpr = Lists.newArrayList();
+ createTestSarg(inspector, udf, childExpr);
+ setupExternalCacheConfig(isPpd, testFilePath + "," + testFilePath2);
+ // Get the base values w/o cache.
+ conf.setBoolean(ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED.varname, false);
+ OrcInputFormatForTest.clearLocalCache();
+ OrcInputFormat in0 = new OrcInputFormat();
+ InputSplit[] originals = in0.getSplits(conf, -1);
+ assertEquals(10, originals.length);
+ HashSet<FsWithHash> originalHs = new HashSet<>();
+ for (InputSplit original : originals) {
+ originalHs.add(new FsWithHash((FileSplit)original));
+ }
+
+ // Populate the cache.
+ conf.setBoolean(ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED.varname, true);
+ OrcInputFormatForTest in = new OrcInputFormatForTest();
+ OrcInputFormatForTest.clearLocalCache();
+ OrcInputFormatForTest.caches.resetCounts();
+ OrcInputFormatForTest.caches.cache.clear();
+ InputSplit[] splits = in.getSplits(conf, -1);
+ // Puts, gets, hits, unused, unused.
+ @SuppressWarnings("static-access")
+ AtomicInteger[] counts = { in.caches.putCount,
+ isPpd ? in.caches.getByExprCount : in.caches.getCount,
+ isPpd ? in.caches.getHitByExprCount : in.caches.getHitCount,
+ isPpd ? in.caches.getCount : in.caches.getByExprCount,
+ isPpd ? in.caches.getHitCount : in.caches.getHitByExprCount };
+
+ verifySplits(originalHs, splits);
+ verifyCallCounts(counts, 2, 2, 0);
+ assertEquals(2, OrcInputFormatForTest.caches.cache.size());
+
+ // Verify we can get from cache.
+ OrcInputFormatForTest.clearLocalCache();
+ OrcInputFormatForTest.caches.resetCounts();
+ splits = in.getSplits(conf, -1);
+ verifySplits(originalHs, splits);
+ verifyCallCounts(counts, 0, 2, 2);
+
+ // Verify ORC SARG still works.
+ OrcInputFormatForTest.clearLocalCache();
+ OrcInputFormatForTest.caches.resetCounts();
+ childExpr.set(1, new ExprNodeConstantDesc(5));
+ conf.set("hive.io.filter.expr.serialized", SerializationUtilities.serializeExpression(
+ new ExprNodeGenericFuncDesc(inspector, udf, childExpr)));
+ splits = in.getSplits(conf, -1);
+ InputSplit[] filtered = { originals[0], originals[4], originals[5], originals[9] };
+ originalHs = new HashSet<>();
+ for (InputSplit original : filtered) {
+ originalHs.add(new FsWithHash((FileSplit)original));
+ }
+ verifySplits(originalHs, splits);
+ verifyCallCounts(counts, 0, 2, 2);
+
+ // Verify corrupted cache value gets replaced.
+ OrcInputFormatForTest.clearLocalCache();
+ OrcInputFormatForTest.caches.resetCounts();
+ Map.Entry<Long, MockExternalCaches.MockItem> e =
+ OrcInputFormatForTest.caches.cache.entrySet().iterator().next();
+ Long key = e.getKey();
+ byte[] someData = new byte[8];
+ ByteBuffer toCorrupt = e.getValue().data;
+ System.arraycopy(toCorrupt.array(), toCorrupt.arrayOffset(), someData, 0, someData.length);
+ toCorrupt.putLong(0, 0L);
+ splits = in.getSplits(conf, -1);
+ verifySplits(originalHs, splits);
+ if (!isPpd) { // Recovery is not implemented yet for PPD path.
+ ByteBuffer restored = OrcInputFormatForTest.caches.cache.get(key).data;
+ byte[] newData = new byte[someData.length];
+ System.arraycopy(restored.array(), restored.arrayOffset(), newData, 0, newData.length);
+ assertArrayEquals(someData, newData);
+ }
+ }
+
+ private void verifyCallCounts(AtomicInteger[] counts, int puts, int gets, int hits) {
+ assertEquals("puts", puts, counts[0].get());
+ assertEquals("gets", gets, counts[1].get());
+ assertEquals("hits", hits, counts[2].get());
+ assertEquals("unused1", 0, counts[3].get());
+ assertEquals("unused2", 0, counts[4].get());
+ }
+
+ private void verifySplits(HashSet<FsWithHash> originalHs, InputSplit[] splits) {
+ if (originalHs.size() != splits.length) {
+ String s = "Expected [";
+ for (FsWithHash fwh : originalHs) {
+ s += toString(fwh.fs) + ", ";
+ }
+ s += "], actual [";
+ for (InputSplit fs : splits) {
+ s += toString((FileSplit)fs) + ", ";
+ }
+ fail(s + "]");
+ }
+ for (int i = 0; i < splits.length; ++i) {
+ FileSplit fs = (FileSplit)splits[i];
+ if (!originalHs.contains(new FsWithHash((FileSplit)splits[i]))) {
+ String s = " in [";
+ for (FsWithHash fwh : originalHs) {
+ s += toString(fwh.fs) + ", ";
+ }
+ fail("Cannot find " + toString(fs) + s);
+ }
+ }
+
+ }
+
+ private static String toString(FileSplit fs) {
+ return "{" + fs.getPath() + ", " + fs.getStart() + ", " + fs.getLength() + "}";
+ }
+
+ private void setupExternalCacheConfig(boolean isPpd, String paths) {
+ FileInputFormat.setInputPaths(conf, paths);
+ conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
+ conf.setLong(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, 1000);
+ conf.setLong(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, 5000);
+ conf.setBoolean(ConfVars.HIVE_ORC_MS_FOOTER_CACHE_PPD.varname, isPpd);
+ conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, isPpd);
+ }
+
+ private ObjectInspector createIO() {
+ synchronized (TestOrcFile.class) {
+ return ObjectInspectorFactory
+ .getReflectionObjectInspector(AllTypesRow.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ }
+
+ private void writeFile(ObjectInspector inspector, Path filePath) throws IOException {
+ Writer writer = OrcFile.createWriter(
+ fs, filePath, conf, inspector, 100000, CompressionKind.NONE, 10000, 10000);
+ writeData(writer);
+ writer.close();
+ }
+
+ private void createTestSarg(
+ ObjectInspector inspector, GenericUDF udf, List<ExprNodeDesc> childExpr) {
+ childExpr.add(new ExprNodeColumnDesc(Long.class, "userid", "T", false));
+ childExpr.add(new ExprNodeConstantDesc(100));
+ conf.set("hive.io.filter.expr.serialized", SerializationUtilities.serializeExpression(
+ new ExprNodeGenericFuncDesc(inspector, udf, childExpr)));
+ }
+
private void writeData(Writer writer) throws IOException {
for (int i = 0; i < 25000; i++) {
if (i == 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java
index dc71db4..469a3da 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java
@@ -99,5 +99,4 @@ public interface PredicateLeaf {
*
*/
public List<Object> getLiteralList();
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
index be5e67b..8c5bab2 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
@@ -171,7 +171,6 @@ final class SearchArgumentImpl implements SearchArgument {
}
}
-
private final List<PredicateLeaf> leaves;
private final ExpressionTree expression;
[2/2] hive git commit: HIVE-11675 : make use of file footer PPD API
in ETL strategy or separate strategy (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-11675 : make use of file footer PPD API in ETL strategy or separate strategy (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/868db42a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/868db42a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/868db42a
Branch: refs/heads/master
Commit: 868db42a695e3137c65b53386eb4d2b2ec76b265
Parents: 26b5c7b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Mar 15 18:37:29 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Mar 15 18:37:29 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hadoop/hive/metastore/FileFormatProxy.java | 6 +-
.../hive/metastore/FileMetadataHandler.java | 2 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 19 +-
.../hive/metastore/HiveMetaStoreClient.java | 43 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 4 +
.../filemeta/OrcFileMetadataHandler.java | 15 +-
orc/src/java/org/apache/orc/impl/InStream.java | 2 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 4 +-
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 17 +
.../hadoop/hive/ql/io/orc/ExternalCache.java | 338 +++++++++++
.../hadoop/hive/ql/io/orc/LocalCache.java | 112 ++++
.../io/orc/MetastoreExternalCachesByConf.java | 82 +++
.../hive/ql/io/orc/OrcFileFormatProxy.java | 14 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 593 ++++++++-----------
.../hive/ql/io/orc/OrcNewInputFormat.java | 16 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 12 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 12 +-
.../hive/ql/io/orc/TestOrcSplitElimination.java | 405 +++++++++++--
.../hadoop/hive/ql/io/sarg/PredicateLeaf.java | 1 -
.../hive/ql/io/sarg/SearchArgumentImpl.java | 1 -
21 files changed, 1286 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9fd6648..98c6372 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1213,6 +1213,9 @@ public class HiveConf extends Configuration {
HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false,
"Whether to enable using file metadata cache in metastore for ORC file footers."),
+ HIVE_ORC_MS_FOOTER_CACHE_PPD("hive.orc.splits.ms.footer.cache.ppd.enabled", true,
+ "Whether to enable file footer cache PPD (hive.orc.splits.ms.footer.cache.enabled\n" +
+ "must also be set to true for this to work)."),
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false,
"If turned on splits generated by orc will include metadata about the stripes in the file. This\n" +
@@ -1222,7 +1225,7 @@ public class HiveConf extends Configuration {
"generation. 0 means process directories individually. This can increase the number of\n" +
"metastore calls if metastore metadata cache is used."),
HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
- "Include file ID in splits on file systems thaty support it."),
+ "Include file ID in splits on file systems that support it."),
HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid", true,
"Allow synthetic file ID in splits on file systems that don't have a native one."),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java b/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
index ec0be2b..14ff187 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
/**
@@ -33,11 +34,10 @@ public interface FileFormatProxy {
/**
* Applies SARG to file metadata, and produces some result for this file.
* @param sarg SARG
- * @param byteBuffer File metadata from metastore cache.
+ * @param fileMetadata File metadata from metastore cache.
* @return The result to return to client for this file, or null if file is eliminated.
- * @throws IOException
*/
- ByteBuffer applySargToMetadata(SearchArgument sarg, ByteBuffer byteBuffer) throws IOException;
+ SplitInfos applySargToMetadata(SearchArgument sarg, ByteBuffer fileMetadata) throws IOException;
/**
* @param fs The filesystem of the file.
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
index bd4e188..832daec 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.metastore.hbase.MetadataStore;
* contains the actual implementation that depends on some stuff in QL (for ORC).
*/
public abstract class FileMetadataHandler {
- static final Log LOG = LogFactory.getLog(FileMetadataHandler.class);
+ protected static final Log LOG = LogFactory.getLog(FileMetadataHandler.class);
private Configuration conf;
private PartitionExpressionProxy expressionProxy;
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2fa0e9a..c9fadad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -5947,17 +5947,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean[] eliminated = new boolean[fileIds.size()];
getMS().getFileMetadataByExpr(fileIds, type, req.getExpr(), metadatas, ppdResults, eliminated);
- for (int i = 0; i < metadatas.length; ++i) {
- long fileId = fileIds.get(i);
- ByteBuffer metadata = metadatas[i];
- if (metadata == null) continue;
- metadata = (eliminated[i] || !needMetadata) ? null
- : handleReadOnlyBufferForThrift(metadata);
+ for (int i = 0; i < fileIds.size(); ++i) {
+ if (!eliminated[i] && ppdResults[i] == null) continue; // No metadata => no ppd.
MetadataPpdResult mpr = new MetadataPpdResult();
- ByteBuffer bitset = eliminated[i] ? null : handleReadOnlyBufferForThrift(ppdResults[i]);
- mpr.setMetadata(metadata);
- mpr.setIncludeBitset(bitset);
- result.putToMetadata(fileId, mpr);
+ ByteBuffer ppdResult = eliminated[i] ? null : handleReadOnlyBufferForThrift(ppdResults[i]);
+ mpr.setIncludeBitset(ppdResult);
+ if (needMetadata) {
+ ByteBuffer metadata = eliminated[i] ? null : handleReadOnlyBufferForThrift(metadatas[i]);
+ mpr.setMetadata(metadata);
+ }
+ result.putToMetadata(fileIds.get(i), mpr);
}
if (!result.isSetMetadata()) {
result.setMetadata(EMPTY_MAP_FM2); // Set the required field.
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 9048d45..cdd12ab 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
@@ -54,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.FireEventResponse;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
import org.apache.hadoop.hive.metastore.api.GetChangeVersionRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
@@ -79,6 +80,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
@@ -2247,15 +2249,48 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
if (listIndex == fileIds.size()) return null;
int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size());
List<Long> subList = fileIds.subList(listIndex, endIndex);
- GetFileMetadataRequest req = new GetFileMetadataRequest();
- req.setFileIds(subList);
- GetFileMetadataResult resp = client.get_file_metadata(req);
+ GetFileMetadataResult resp = sendGetFileMetadataReq(subList);
+ // TODO: we could remember if it's unsupported and stop sending calls; although, it might
+ // be a bad idea for HS2+standalone metastore that could be updated with support.
+ // Maybe we should just remember this for some time.
+ if (!resp.isIsSupported()) return null;
listIndex = endIndex;
return resp.getMetadata();
}
};
}
+ private GetFileMetadataResult sendGetFileMetadataReq(List<Long> fileIds) throws TException {
+ return client.get_file_metadata(new GetFileMetadataRequest(fileIds));
+ }
+
+ @Override
+ public Iterable<Entry<Long, MetadataPpdResult>> getFileMetadataBySarg(
+ final List<Long> fileIds, final ByteBuffer sarg, final boolean doGetFooters)
+ throws TException {
+ return new MetastoreMapIterable<Long, MetadataPpdResult>() {
+ private int listIndex = 0;
+ @Override
+ protected Map<Long, MetadataPpdResult> fetchNextBatch() throws TException {
+ if (listIndex == fileIds.size()) return null;
+ int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size());
+ List<Long> subList = fileIds.subList(listIndex, endIndex);
+ GetFileMetadataByExprResult resp = sendGetFileMetadataBySargReq(
+ sarg, subList, doGetFooters);
+ if (!resp.isIsSupported()) return null;
+ listIndex = endIndex;
+ return resp.getMetadata();
+ }
+ };
+ }
+
+ private GetFileMetadataByExprResult sendGetFileMetadataBySargReq(
+ ByteBuffer sarg, List<Long> fileIds, boolean doGetFooters) throws TException {
+ GetFileMetadataByExprRequest req = new GetFileMetadataByExprRequest(fileIds, sarg);
+ req.setDoGetFooters(doGetFooters); // No need to get footers
+ return client.get_file_metadata_by_expr(req);
+ }
+
public static abstract class MetastoreMapIterable<K, V>
implements Iterable<Entry<K, V>>, Iterator<Entry<K, V>> {
private Iterator<Entry<K, V>> currentIter;
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 62677d1..39cf927 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
@@ -1534,6 +1535,9 @@ public interface IMetaStoreClient {
*/
Iterable<Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds) throws TException;
+ Iterable<Entry<Long, MetadataPpdResult>> getFileMetadataBySarg(
+ List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws TException;
+
/**
* Cleares the file metadata cache for respective file IDs.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
index 1b388aa..3bca85d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hive.metastore.FileMetadataHandler;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -44,11 +45,21 @@ public class OrcFileMetadataHandler extends FileMetadataHandler {
}
getStore().getFileMetadata(fileIds, metadatas);
for (int i = 0; i < metadatas.length; ++i) {
+ eliminated[i] = false;
+ results[i] = null;
if (metadatas[i] == null) continue;
- ByteBuffer result = getFileFormatProxy().applySargToMetadata(sarg, metadatas[i]);
+ ByteBuffer metadata = metadatas[i].duplicate(); // Duplicate to avoid modification.
+ SplitInfos result = null;
+ try {
+ result = getFileFormatProxy().applySargToMetadata(sarg, metadata);
+ } catch (IOException ex) {
+ LOG.error("Failed to apply SARG to metadata", ex);
+ metadatas[i] = null;
+ continue;
+ }
eliminated[i] = (result == null);
if (!eliminated[i]) {
- results[i] = result;
+ results[i] = ByteBuffer.wrap(result.toByteArray());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
index b1c6de5..1893afe 100644
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -35,7 +35,7 @@ import com.google.protobuf.CodedInputStream;
public abstract class InStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
- private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+ public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
protected final String name;
protected long length;
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2b50a2a..9446876 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -481,7 +481,7 @@ public class AcidUtils {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
} catch (Throwable t) {
- LOG.error("Failed to get files with ID; using regular API", t);
+ LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
useFileIds = false;
}
}
@@ -648,7 +648,7 @@ public class AcidUtils {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
} catch (Throwable t) {
- LOG.error("Failed to get files with ID; using regular API", t);
+ LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
useFileIds = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index 1a40847..b71ca09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -47,6 +47,23 @@ public class HdfsUtils {
return new SyntheticFileId(path, fs.getLen(), fs.getModificationTime());
}
+ public static long createFileId(String pathStr, FileStatus fs, boolean doLog, String fsName) {
+ int nameHash = pathStr.hashCode();
+ long fileSize = fs.getLen(), modTime = fs.getModificationTime();
+ int fileSizeHash = (int)(fileSize ^ (fileSize >>> 32)),
+ modTimeHash = (int)(modTime ^ (modTime >>> 32)),
+ combinedHash = modTimeHash ^ fileSizeHash;
+ long id = (((long)nameHash & 0xffffffffL) << 32) | ((long)combinedHash & 0xffffffffL);
+ if (doLog) {
+ LOG.warn("Cannot get unique file ID from " + fsName + "; using " + id
+ + " (" + pathStr + "," + nameHash + "," + fileSize + ")");
+ }
+ return id;
+ }
+
+
+
+
// TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
// is still going to work. Otherwise, file IDs can be turned off. Later, we should use
// as public utility method in HDFS to obtain the inode-based path.
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
new file mode 100644
index 0000000..6556fbf
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileInfo;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FooterCache;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.orc.FileMetaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+/** Metastore-based footer cache storing serialized footers. Also has a local cache. */
+public class ExternalCache implements FooterCache {
+ private static final Logger LOG = LoggerFactory.getLogger(ExternalCache.class);
+ private static boolean isDebugEnabled = LOG.isDebugEnabled();
+
+ private final LocalCache localCache;
+ private final ExternalFooterCachesByConf externalCacheSrc;
+ private boolean isWarnLogged = false;
+
+ // Configuration and things set from it.
+ private HiveConf conf;
+ private boolean isInTest;
+ private SearchArgument sarg;
+ private ByteBuffer sargIsOriginal, sargNotIsOriginal;
+ private boolean isPpdEnabled;
+
+ public ExternalCache(LocalCache lc, ExternalFooterCachesByConf efcf) {
+ localCache = lc;
+ externalCacheSrc = efcf;
+ }
+
+ @Override
+ public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+ throws IOException {
+ localCache.put(fileId, file, fileMetaInfo, orcReader);
+ if (fileId != null) {
+ try {
+ externalCacheSrc.getCache(conf).putFileMetadata(Lists.newArrayList(fileId),
+ Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter()));
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return true;
+ }
+
+ @Override
+ public boolean hasPpd() {
+ return isPpdEnabled;
+ }
+
+ public void configure(HiveConf queryConfig) {
+ this.conf = queryConfig;
+ this.sarg = ConvertAstToSearchArg.createFromConf(conf);
+ this.isPpdEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVEOPTINDEXFILTER)
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_MS_FOOTER_CACHE_PPD);
+ this.isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
+ this.sargIsOriginal = this.sargNotIsOriginal = null;
+ }
+
+ @Override
+ public void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+ FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, HiveException {
+ assert result.length == files.size();
+ assert ppdResult == null || ppdResult.length == files.size();
+ // First, check the local cache.
+ localCache.getAndValidate(files, isOriginal, result, ppdResult);
+
+ // posMap is an unfortunate consequence of batching/iterating thru MS results.
+ HashMap<Long, Integer> posMap = new HashMap<Long, Integer>();
+ // We won't do metastore-side PPD for the things we have locally.
+ List<Long> fileIds = determineFileIdsToQuery(files, result, posMap);
+ // Need to get a new one, see the comment wrt threadlocals.
+ ExternalFooterCachesByConf.Cache cache = externalCacheSrc.getCache(conf);
+ ByteBuffer serializedSarg = null;
+ if (isPpdEnabled) {
+ serializedSarg = getSerializedSargForMetastore(isOriginal);
+ }
+ if (serializedSarg != null) {
+ Iterator<Entry<Long, MetadataPpdResult>> iter = cache.getFileMetadataByExpr(
+ fileIds, serializedSarg, false); // don't fetch the footer, PPD happens in MS.
+ while (iter.hasNext()) {
+ Entry<Long, MetadataPpdResult> e = iter.next();
+ int ix = getAndVerifyIndex(posMap, files, result, e.getKey());
+ processPpdResult(e.getValue(), files.get(ix), ix, result, ppdResult);
+ }
+ } else {
+ // Only populate corrupt IDs for the things we couldn't deserialize if we are not using
+ // ppd. We assume that PPD makes sure the cached values are correct (or fails otherwise);
+ // also, we don't use the footers in PPD case.
+ List<Long> corruptIds = null;
+ Iterator<Entry<Long, ByteBuffer>> iter = cache.getFileMetadata(fileIds);
+ while (iter.hasNext()) {
+ Entry<Long, ByteBuffer> e = iter.next();
+ int ix = getAndVerifyIndex(posMap, files, result, e.getKey());
+ if (!processBbResult(e.getValue(), ix, files.get(ix), result)) {
+ if (corruptIds == null) {
+ corruptIds = new ArrayList<>();
+ }
+ corruptIds.add(e.getKey());
+ }
+ }
+ if (corruptIds != null) {
+ cache.clearFileMetadata(corruptIds);
+ }
+ }
+ }
+
+ private int getAndVerifyIndex(HashMap<Long, Integer> posMap,
+ List<HdfsFileStatusWithId> files, FileInfo[] result, Long fileId) {
+ int ix = posMap.get(fileId);
+ assert result[ix] == null;
+ assert fileId != null && fileId.equals(files.get(ix).getFileId());
+ return ix;
+ }
+
+ private boolean processBbResult(
+ ByteBuffer bb, int ix, HdfsFileStatusWithId file, FileInfo[] result) throws IOException {
+ if (bb == null) return true;
+ result[ix] = createFileInfoFromMs(file, bb);
+ if (result[ix] == null) {
+ return false;
+ }
+
+ localCache.put(file.getFileStatus().getPath(), result[ix]);
+ return true;
+ }
+
+ private void processPpdResult(MetadataPpdResult mpr, HdfsFileStatusWithId file,
+ int ix, FileInfo[] result, ByteBuffer[] ppdResult) throws IOException {
+ if (mpr == null) return; // This file is unknown to metastore.
+
+ ppdResult[ix] = mpr.isSetIncludeBitset() ? mpr.bufferForIncludeBitset() : NO_SPLIT_AFTER_PPD;
+ if (mpr.isSetMetadata()) {
+ result[ix] = createFileInfoFromMs(file, mpr.bufferForMetadata());
+ if (result[ix] != null) {
+ localCache.put(file.getFileStatus().getPath(), result[ix]);
+ }
+ }
+ }
+
+ private List<Long> determineFileIdsToQuery(
+ List<HdfsFileStatusWithId> files, FileInfo[] result, HashMap<Long, Integer> posMap) {
+ for (int i = 0; i < result.length; ++i) {
+ if (result[i] != null) continue;
+ HdfsFileStatusWithId file = files.get(i);
+ final FileStatus fs = file.getFileStatus();
+ Long fileId = file.getFileId();
+ if (fileId == null) {
+ if (!isInTest) {
+ if (!isWarnLogged || isDebugEnabled) {
+ LOG.warn("Not using metastore cache because fileId is missing: " + fs.getPath());
+ isWarnLogged = true;
+ }
+ continue;
+ }
+ fileId = generateTestFileId(fs, files, i);
+ LOG.info("Generated file ID " + fileId + " at " + i);
+ }
+ posMap.put(fileId, i);
+ }
+ return Lists.newArrayList(posMap.keySet());
+ }
+
+ private Long generateTestFileId(final FileStatus fs, List<HdfsFileStatusWithId> files, int i) {
+ final Long fileId = HdfsUtils.createFileId(fs.getPath().toUri().getPath(), fs, false, null);
+ files.set(i, new HdfsFileStatusWithId() {
+ @Override
+ public FileStatus getFileStatus() {
+ return fs;
+ }
+
+ @Override
+ public Long getFileId() {
+ return fileId;
+ }
+ });
+ return fileId;
+ }
+
+ private ByteBuffer getSerializedSargForMetastore(boolean isOriginal) {
+ if (sarg == null) return null;
+ ByteBuffer serializedSarg = isOriginal ? sargIsOriginal : sargNotIsOriginal;
+ if (serializedSarg != null) return serializedSarg;
+ SearchArgument sarg2 = sarg;
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ try {
+ if ((isOriginal ? sargNotIsOriginal : sargIsOriginal) == null) {
+ sarg2 = kryo.copy(sarg2); // In case we need it for the other case.
+ }
+ translateSargToTableColIndexes(sarg2, conf, OrcInputFormat.getRootColumn(isOriginal));
+ ExternalCache.Baos baos = new Baos();
+ Output output = new Output(baos);
+ kryo.writeObject(output, sarg2);
+ output.flush();
+ serializedSarg = baos.get();
+ if (isOriginal) {
+ sargIsOriginal = serializedSarg;
+ } else {
+ sargNotIsOriginal = serializedSarg;
+ }
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
+ return serializedSarg;
+ }
+
+ /**
+ * Modifies the SARG, replacing column names with column indexes in target table schema. This
+ * basically does the same thing as all the shennannigans with included columns, except for the
+ * last step where ORC gets direct subtypes of root column and uses the ordered match to map
+ * table columns to file columns. The numbers put into predicate leaf should allow to go into
+ * said subtypes directly by index to get the proper index in the file.
+ * This won't work with schema evolution, although it's probably much easier to reason about
+ * if schema evolution was to be supported, because this is a clear boundary between table
+ * schema columns and all things ORC. None of the ORC stuff is used here and none of the
+ * table schema stuff is used after that - ORC doesn't need a bunch of extra crap to apply
+ * the SARG thus modified.
+ */
+ public static void translateSargToTableColIndexes(
+ SearchArgument sarg, Configuration conf, int rootColumn) {
+ String nameStr = OrcInputFormat.getNeededColumnNamesString(conf),
+ idStr = OrcInputFormat.getSargColumnIDsString(conf);
+ String[] knownNames = nameStr.split(",");
+ String[] idStrs = (idStr == null) ? null : idStr.split(",");
+ assert idStrs == null || knownNames.length == idStrs.length;
+ HashMap<String, Integer> nameIdMap = new HashMap<>();
+ for (int i = 0; i < knownNames.length; ++i) {
+ Integer newId = (idStrs != null) ? Integer.parseInt(idStrs[i]) : i;
+ Integer oldId = nameIdMap.put(knownNames[i], newId);
+ if (oldId != null && oldId.intValue() != newId.intValue()) {
+ throw new RuntimeException("Multiple IDs for " + knownNames[i] + " in column strings: ["
+ + idStr + "], [" + nameStr + "]");
+ }
+ }
+ List<PredicateLeaf> leaves = sarg.getLeaves();
+ for (int i = 0; i < leaves.size(); ++i) {
+ PredicateLeaf pl = leaves.get(i);
+ Integer colId = nameIdMap.get(pl.getColumnName());
+ String newColName = RecordReaderImpl.encodeTranslatedSargColumn(rootColumn, colId);
+ SearchArgumentFactory.setPredicateLeafColumn(pl, newColName);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SARG translated into " + sarg);
+ }
+ }
+
+ private static FileInfo createFileInfoFromMs(
+ HdfsFileStatusWithId file, ByteBuffer bb) throws IOException {
+ if (bb == null) return null;
+ FileStatus fs = file.getFileStatus();
+ ReaderImpl.FooterInfo fi = null;
+ ByteBuffer copy = bb.duplicate();
+ try {
+ fi = ReaderImpl.extractMetaInfoFromFooter(copy, fs.getPath());
+ } catch (Exception ex) {
+ byte[] data = new byte[bb.remaining()];
+ System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), data, 0, data.length);
+ String msg = "Failed to parse the footer stored in cache for file ID "
+ + file.getFileId() + " " + bb + " [ " + Hex.encodeHexString(data) + " ]";
+ LOG.error(msg, ex);
+ return null;
+ }
+ return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(), fi.getMetadata(),
+ fi.getFooter().getTypesList(), fi.getFooter().getStatisticsList(), fi.getFileMetaInfo(),
+ fi.getFileMetaInfo().writerVersion, file.getFileId());
+ }
+
+ private static final class Baos extends ByteArrayOutputStream {
+ public ByteBuffer get() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+ }
+
+
+ /** An abstraction for testing ExternalCache in OrcInputFormat. */
+ public interface ExternalFooterCachesByConf {
+ public interface Cache {
+ Iterator<Map.Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(List<Long> fileIds,
+ ByteBuffer serializedSarg, boolean doGetFooters) throws HiveException;
+ void clearFileMetadata(List<Long> fileIds) throws HiveException;
+ Iterator<Map.Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds)
+ throws HiveException;
+ void putFileMetadata(
+ ArrayList<Long> keys, ArrayList<ByteBuffer> values) throws HiveException;
+ }
+
+ public Cache getCache(HiveConf conf) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
new file mode 100644
index 0000000..8151e52
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileInfo;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FooterCache;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.orc.FileMetaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/** Local footer cache using Guava. Stores convoluted Java objects. */
+class LocalCache implements FooterCache {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalCache.class);
+ private static boolean isDebugEnabled = LOG.isDebugEnabled();
+
+ private final Cache<Path, FileInfo> cache;
+
+ public LocalCache(int numThreads, int cacheStripeDetailsSize) {
+ cache = CacheBuilder.newBuilder()
+ .concurrencyLevel(numThreads)
+ .initialCapacity(cacheStripeDetailsSize)
+ .maximumSize(cacheStripeDetailsSize)
+ .softValues()
+ .build();
+ }
+
+ public void clear() {
+ cache.invalidateAll();
+ cache.cleanUp();
+ }
+
+ public void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+ FileInfo[] result, ByteBuffer[] ppdResult) throws IOException {
+ // TODO: should local cache also be by fileId? Preserve the original logic for now.
+ assert result.length == files.size();
+ int i = -1;
+ for (HdfsFileStatusWithId fileWithId : files) {
+ ++i;
+ FileStatus file = fileWithId.getFileStatus();
+ Path path = file.getPath();
+ Long fileId = fileWithId.getFileId();
+ FileInfo fileInfo = cache.getIfPresent(path);
+ if (isDebugEnabled) {
+ LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path);
+ }
+ if (fileInfo == null) continue;
+ if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId)
+ || (fileInfo.modificationTime == file.getModificationTime() &&
+ fileInfo.size == file.getLen())) {
+ result[i] = fileInfo;
+ continue;
+ }
+ // Invalidate
+ cache.invalidate(path);
+ if (isDebugEnabled) {
+ LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: "
+ + fileInfo.modificationTime + ", CurrentModificationTime: "
+ + file.getModificationTime() + ", CachedLength: " + fileInfo.size
+ + ", CurrentLength: " + file.getLen());
+ }
+ }
+ }
+
+ public void put(Path path, FileInfo fileInfo) {
+ cache.put(path, fileInfo);
+ }
+
+ @Override
+ public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+ throws IOException {
+ cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(),
+ orcReader.getStripes(), orcReader.getStripeStatistics(), orcReader.getTypes(),
+ orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
+ fileId));
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return false;
+ }
+
+ @Override
+ public boolean hasPpd() {
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
new file mode 100644
index 0000000..ad8f4ef
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * An implementation of external cache and factory based on metastore.
+ */
+public class MetastoreExternalCachesByConf implements ExternalFooterCachesByConf {
+ public static class HBaseCache implements ExternalFooterCachesByConf.Cache {
+ private Hive hive;
+
+ public HBaseCache(Hive hive) {
+ this.hive = hive;
+ }
+
+ @Override
+ public Iterator<Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(
+ List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException {
+ return hive.getFileMetadataByExpr(fileIds, sarg, doGetFooters).iterator();
+ }
+
+ @Override
+ public void clearFileMetadata(List<Long> fileIds) throws HiveException {
+ hive.clearFileMetadata(fileIds);
+ }
+
+ @Override
+ public Iterator<Entry<Long, ByteBuffer>> getFileMetadata(
+ List<Long> fileIds) throws HiveException {
+ return hive.getFileMetadata(fileIds).iterator();
+ }
+
+ @Override
+ public void putFileMetadata(
+ ArrayList<Long> fileIds, ArrayList<ByteBuffer> metadata) throws HiveException {
+ hive.putFileMetadata(fileIds, metadata);
+ }
+ }
+
+ @Override
+ public ExternalFooterCachesByConf.Cache getCache(HiveConf conf) throws IOException {
+ // TODO: we wish we could cache the Hive object, but it's not thread safe, and each
+ // threadlocal we "cache" would need to be reinitialized for every query. This is
+ // a huge PITA. Hive object will be cached internally, but the compat check will be
+ // done every time inside get().
+ try {
+ return new HBaseCache(Hive.getWithFastCheck(conf));
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
index ef76723..c9c7b5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
@@ -29,16 +29,19 @@ import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** File format proxy for ORC. */
public class OrcFileFormatProxy implements FileFormatProxy {
+ private static final Logger LOG = LoggerFactory.getLogger(OrcFileFormatProxy.class);
@Override
- public ByteBuffer applySargToMetadata(
- SearchArgument sarg, ByteBuffer byteBuffer) throws IOException {
+ public SplitInfos applySargToMetadata(
+ SearchArgument sarg, ByteBuffer fileMetadata) throws IOException {
// TODO: ideally we should store shortened representation of only the necessary fields
// in HBase; it will probably require custom SARG application code.
- ReaderImpl.FooterInfo fi = ReaderImpl.extractMetaInfoFromFooter(byteBuffer, null);
+ ReaderImpl.FooterInfo fi = ReaderImpl.extractMetaInfoFromFooter(fileMetadata, null);
OrcProto.Footer footer = fi.getFooter();
int stripeCount = footer.getStripesCount();
boolean[] result = OrcInputFormat.pickStripesViaTranslatedSarg(
@@ -52,10 +55,13 @@ public class OrcFileFormatProxy implements FileFormatProxy {
if (result != null && !result[i]) continue;
isEliminated = false;
StripeInformation si = stripes.get(i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PPD is adding a split " + i + ": " + si.getOffset() + ", " + si.getLength());
+ }
sb.addInfos(SplitInfo.newBuilder().setIndex(i)
.setOffset(si.getOffset()).setLength(si.getLength()));
}
- return isEliminated ? null : ByteBuffer.wrap(sb.build().toByteArray());
+ return isEliminated ? null : sb.build();
}
public ByteBuffer[] getAddedColumnsToCache() {
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index cd2a668..8b611bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -18,16 +18,17 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.orc.impl.InStream;
+
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -40,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -70,6 +70,8 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.Metastore;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -84,15 +86,14 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -111,10 +112,9 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.orc.OrcProto;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedInputStream;
/**
* A MapReduce/Hive input format for ORC files.
* <p>
@@ -274,7 +274,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* @param isOriginal is the file in the original format?
* @return the column number for the root of row.
*/
- private static int getRootColumn(boolean isOriginal) {
+ static int getRootColumn(boolean isOriginal) {
return isOriginal ? 0 : (OrcRecordUpdater.ROW + 1);
}
@@ -335,45 +335,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- /**
- * Modifies the SARG, replacing column names with column indexes in target table schema. This
- * basically does the same thing as all the shennannigans with included columns, except for the
- * last step where ORC gets direct subtypes of root column and uses the ordered match to map
- * table columns to file columns. The numbers put into predicate leaf should allow to go into
- * said subtypes directly by index to get the proper index in the file.
- * This won't work with schema evolution, although it's probably much easier to reason about
- * if schema evolution was to be supported, because this is a clear boundary between table
- * schema columns and all things ORC. None of the ORC stuff is used here and none of the
- * table schema stuff is used after that - ORC doesn't need a bunch of extra crap to apply
- * the SARG thus modified.
- */
- public static void translateSargToTableColIndexes(
- SearchArgument sarg, Configuration conf, int rootColumn) {
- String nameStr = getNeededColumnNamesString(conf), idStr = getSargColumnIDsString(conf);
- String[] knownNames = nameStr.split(",");
- String[] idStrs = (idStr == null) ? null : idStr.split(",");
- assert idStrs == null || knownNames.length == idStrs.length;
- HashMap<String, Integer> nameIdMap = new HashMap<>();
- for (int i = 0; i < knownNames.length; ++i) {
- Integer newId = (idStrs != null) ? Integer.parseInt(idStrs[i]) : i;
- Integer oldId = nameIdMap.put(knownNames[i], newId);
- if (oldId != null && oldId.intValue() != newId.intValue()) {
- throw new RuntimeException("Multiple IDs for " + knownNames[i] + " in column strings: ["
- + idStr + "], [" + nameStr + "]");
- }
- }
- List<PredicateLeaf> leaves = sarg.getLeaves();
- for (int i = 0; i < leaves.size(); ++i) {
- PredicateLeaf pl = leaves.get(i);
- Integer colId = nameIdMap.get(pl.getColumnName());
- String newColName = RecordReaderImpl.encodeTranslatedSargColumn(rootColumn, colId);
- SearchArgumentFactory.setPredicateLeafColumn(pl, newColName);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("SARG translated into " + sarg);
- }
- }
-
public static boolean[] genIncludedColumns(
List<OrcProto.Type> types, List<Integer> included, boolean isOriginal) {
int rootColumn = getRootColumn(isOriginal);
@@ -477,14 +438,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return getSargColumnNames(columnNamesString.split(","), types, include, isOriginal);
}
- private static String getNeededColumnNamesString(Configuration conf) {
+ static String getNeededColumnNamesString(Configuration conf) {
return conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
}
- private static String getSargColumnIDsString(Configuration conf) {
+ static String getSargColumnIDsString(Configuration conf) {
return conf.getBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, true) ? null
: conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
}
+
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
List<FileStatus> files
@@ -542,7 +504,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// This is not thread safe between different split generations (and wasn't anyway).
private FooterCache footerCache;
private static LocalCache localCache;
- private static MetastoreCache metaCache;
+ private static ExternalCache metaCache;
static ExecutorService threadPool = null;
private final int numBuckets;
private final int splitStrategyBatchMs;
@@ -559,10 +521,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final SearchArgument sarg;
Context(Configuration conf) {
- this(conf, 1);
+ this(conf, 1, null);
}
Context(Configuration conf, final int minSplits) {
+ this(conf, minSplits, null);
+ }
+
+ @VisibleForTesting
+ Context(Configuration conf, final int minSplits, ExternalFooterCachesByConf efc) {
this.conf = conf;
this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
this.sarg = ConvertAstToSearchArg.createFromConf(conf);
@@ -603,20 +570,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// HDFS, because only HDFS would return fileIds for us. If fileId is extended using
// size/mod time/etc. for other FSes, we might need to check FSes explicitly because
// using such an aggregate fileId cache is not bulletproof and should be disable-able.
- boolean useMetastoreCache = HiveConf.getBoolVar(
+ boolean useExternalCache = HiveConf.getBoolVar(
conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED);
if (localCache == null) {
localCache = new LocalCache(numThreads, cacheStripeDetailsSize);
}
- if (useMetastoreCache) {
+ if (useExternalCache) {
if (metaCache == null) {
- metaCache = new MetastoreCache(localCache);
+ metaCache = new ExternalCache(localCache,
+ efc == null ? new MetastoreExternalCachesByConf() : efc);
}
assert conf instanceof HiveConf;
metaCache.configure((HiveConf)conf);
}
// Set footer cache for current split generation. See field comment - not thread safe.
- footerCache = useMetastoreCache ? metaCache : localCache;
+ // TODO: we should be able to enable caches separately
+ footerCache = useExternalCache ? metaCache : localCache;
}
}
String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
@@ -638,6 +607,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
threadPool = null;
}
}
+
+ @VisibleForTesting
+ public static void clearLocalCache() {
+ if (localCache == null) return;
+ localCache.clear();
+ }
}
/**
@@ -676,12 +651,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final boolean isOriginal;
private final List<DeltaMetaData> deltas;
private final boolean hasBase;
+ private final ByteBuffer ppdResult;
- SplitInfo(Context context, FileSystem fs,
- HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
- boolean isOriginal,
- List<DeltaMetaData> deltas,
- boolean hasBase, Path dir, boolean[] covered) throws IOException {
+ SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
+ boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path dir,
+ boolean[] covered, ByteBuffer ppdResult) throws IOException {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fs = fs;
@@ -690,6 +664,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.isOriginal = isOriginal;
this.deltas = deltas;
this.hasBase = hasBase;
+ this.ppdResult = ppdResult;
}
@VisibleForTesting
@@ -697,7 +672,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir,
boolean[] covered) throws IOException {
this(context, fs, AcidUtils.createOriginalObj(null, fileStatus),
- fileInfo, isOriginal, deltas, hasBase, dir, covered);
+ fileInfo, isOriginal, deltas, hasBase, dir, covered, null);
}
}
@@ -719,14 +694,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final FileSystem fs;
}
-
Context context;
- List<ETLDir> dirs;
+ final List<ETLDir> dirs;
List<HdfsFileStatusWithId> files;
- boolean isOriginal;
- List<DeltaMetaData> deltas;
- boolean[] covered;
- private List<Future<List<OrcSplit>>> splitFuturesRef;
+ private final List<DeltaMetaData> deltas;
+ private final boolean[] covered;
+ final boolean isOriginal;
+ // References to external fields for async SplitInfo generation.
+ private List<Future<List<OrcSplit>>> splitFuturesRef = null;
+ private List<OrcSplit> splitsRef = null;
private final UserGroupInformation ugi;
private final boolean allowSyntheticFileIds;
@@ -748,10 +724,19 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
public List<SplitInfo> getSplits() throws IOException {
List<SplitInfo> result = new ArrayList<>(files.size());
// Force local cache if we have deltas.
- FooterCache cache = context.cacheStripeDetails ?
- (deltas == null ? context.footerCache : Context.localCache) : null;
+ FooterCache cache = context.cacheStripeDetails ? ((deltas == null || deltas.isEmpty())
+ ? context.footerCache : Context.localCache) : null;
if (cache != null) {
- FileInfo[] infos = cache.getAndValidate(files);
+ FileInfo[] infos = new FileInfo[files.size()];
+ ByteBuffer[] ppdResults = null;
+ if (cache.hasPpd()) {
+ ppdResults = new ByteBuffer[files.size()];
+ }
+ try {
+ cache.getAndValidate(files, isOriginal, infos, ppdResults);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
int dirIx = -1, fileInDirIx = -1, filesInDirCount = 0;
ETLDir dir = null;
for (int i = 0; i < files.size(); ++i) {
@@ -760,15 +745,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
filesInDirCount = dir.fileCount;
}
FileInfo info = infos[i];
+ ByteBuffer ppdResult = ppdResults == null ? null : ppdResults[i];
+ HdfsFileStatusWithId file = files.get(i);
if (info != null) {
// Cached copy is valid
context.cacheHitCounter.incrementAndGet();
}
- HdfsFileStatusWithId file = files.get(i);
- // ignore files of 0 length
- if (file.getFileStatus().getLen() > 0) {
- result.add(new SplitInfo(
- context, dir.fs, file, info, isOriginal, deltas, true, dir.dir, covered));
+ // Ignore files eliminated by PPD, or of 0 length.
+ if (ppdResult != FooterCache.NO_SPLIT_AFTER_PPD && file.getFileStatus().getLen() > 0) {
+ result.add(new SplitInfo(context, dir.fs, file, info,
+ isOriginal, deltas, true, dir.dir, covered, ppdResult));
}
}
} else {
@@ -781,8 +767,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
// ignore files of 0 length
if (file.getFileStatus().getLen() > 0) {
- result.add(new SplitInfo(
- context, dir.fs, file, null, isOriginal, deltas, true, dir.dir, covered));
+ result.add(new SplitInfo(context, dir.fs, file, null,
+ isOriginal, deltas, true, dir.dir, covered, null));
}
}
}
@@ -826,14 +812,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return CombineResult.YES;
}
- public Future<Void> generateSplitWork(
- Context context, List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+ public Future<Void> generateSplitWork(Context context,
+ List<Future<List<OrcSplit>>> splitFutures, List<OrcSplit> splits) throws IOException {
if ((context.cacheStripeDetails && context.footerCache.isBlocking())
|| context.forceThreadpool) {
this.splitFuturesRef = splitFutures;
+ this.splitsRef = splits;
return Context.threadPool.submit(this);
} else {
- runGetSplitsSync(splitFutures, null);
+ runGetSplitsSync(splitFutures, splits, null);
return null;
}
}
@@ -841,14 +828,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public Void call() throws IOException {
if (ugi == null) {
- runGetSplitsSync(splitFuturesRef, null);
+ runGetSplitsSync(splitFuturesRef, splitsRef, null);
return null;
}
try {
return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- runGetSplitsSync(splitFuturesRef, ugi);
+ runGetSplitsSync(splitFuturesRef, splitsRef, ugi);
return null;
}
});
@@ -857,20 +844,43 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+
+
+
+
private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
- UserGroupInformation ugi) throws IOException {
- List<SplitInfo> splits = getSplits();
- List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+ List<OrcSplit> splits, UserGroupInformation ugi) throws IOException {
UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
- for (SplitInfo splitInfo : splits) {
- localList.add(Context.threadPool.submit(
- new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds)));
+ List<SplitInfo> splitInfos = getSplits();
+ List<Future<List<OrcSplit>>> localListF = null;
+ List<OrcSplit> localListS = null;
+ for (SplitInfo splitInfo : splitInfos) {
+ SplitGenerator sg = new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds);
+ if (!sg.isBlocking()) {
+ if (localListS == null) {
+ localListS = new ArrayList<>(splits.size());
+ }
+ // Already called in doAs, so no need to doAs here.
+ localListS.addAll(sg.call());
+ } else {
+ if (localListF == null) {
+ localListF = new ArrayList<>(splits.size());
+ }
+ localListF.add(Context.threadPool.submit(sg));
+ }
}
- synchronized (splitFutures) {
- splitFutures.addAll(localList);
+ if (localListS != null) {
+ synchronized (splits) {
+ splits.addAll(localListS);
+ }
}
- }
- }
+ if (localListF != null) {
+ synchronized (splitFutures) {
+ splitFutures.addAll(localListF);
+ }
+ }
+ }
+ }
/**
* BI strategy is used when the requirement is to spend less time in split generation
@@ -1018,7 +1028,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
try {
return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
} catch (Throwable t) {
- LOG.error("Failed to get files with ID; using regular API", t);
+ LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
}
}
@@ -1055,6 +1065,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private OrcFile.WriterVersion writerVersion;
private long projColsUncompressedSize;
private final List<OrcSplit> deltaSplits;
+ private final ByteBuffer ppdResult;
private final UserGroupInformation ugi;
private final boolean allowSyntheticFileIds;
@@ -1075,6 +1086,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
this.projColsUncompressedSize = -1;
this.deltaSplits = splitInfo.getSplits();
this.allowSyntheticFileIds = allowSyntheticFileIds;
+ this.ppdResult = splitInfo.ppdResult;
+ }
+
+ public boolean isBlocking() {
+ return ppdResult != null;
}
Path getPath() {
@@ -1182,6 +1198,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
}
+ private static final class OffsetAndLength { // Java cruft; pair of long.
+ public OffsetAndLength() {
+ this.offset = -1;
+ this.length = 0;
+ }
+
+ long offset, length;
+
+ @Override
+ public String toString() {
+ return "[offset=" + offset + ", length=" + length + "]";
+ }
+ }
+
/**
* Divide the adjacent stripes in the file into input splits based on the
* block size and the configured minimum and maximum sizes.
@@ -1204,74 +1234,122 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private List<OrcSplit> callInternal() throws IOException {
- populateAndCacheStripeDetails();
- List<OrcSplit> splits = Lists.newArrayList();
-
- // figure out which stripes we need to read
- boolean[] includeStripe = null;
+ // Figure out which stripes we need to read.
+ if (ppdResult != null) {
+ assert deltaSplits.isEmpty();
+ assert ppdResult.hasArray();
+
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ CodedInputStream cis = CodedInputStream.newInstance(
+ ppdResult.array(), ppdResult.arrayOffset(), ppdResult.remaining());
+ cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
+ return generateSplitsFromPpd(SplitInfos.parseFrom(cis));
+ } else {
+ populateAndCacheStripeDetails();
+ boolean[] includeStripe = null;
+ // We can't eliminate stripes if there are deltas because the
+ // deltas may change the rows making them match the predicate.
+ if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
+ String[] colNames = extractNeededColNames(types, context.conf, includedCols, isOriginal);
+ if (colNames == null) {
+ LOG.warn("Skipping split elimination for {} as column names is null", file.getPath());
+ } else {
+ includeStripe = pickStripes(context.sarg, colNames, writerVersion, isOriginal,
+ stripeStats, stripes.size(), file.getPath());
+ }
+ }
+ return generateSplitsFromStripes(includeStripe);
+ }
+ }
- // we can't eliminate stripes if there are deltas because the
- // deltas may change the rows making them match the predicate.
- if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
- String[] colNames = extractNeededColNames(types, context.conf, includedCols, isOriginal);
- if (colNames == null) {
- LOG.warn("Skipping split elimination for {} as column names is null", file.getPath());
- } else {
- includeStripe = pickStripes(context.sarg, colNames, writerVersion, isOriginal,
- stripeStats, stripes.size(), file.getPath());
+ private List<OrcSplit> generateSplitsFromPpd(SplitInfos ppdResult) throws IOException {
+ OffsetAndLength current = new OffsetAndLength();
+ List<OrcSplit> splits = new ArrayList<>(ppdResult.getInfosCount());
+ int lastIdx = -1;
+ for (Metastore.SplitInfo si : ppdResult.getInfosList()) {
+ int index = si.getIndex();
+ if (lastIdx >= 0 && lastIdx + 1 != index && current.offset != -1) {
+ // Create split for the previous unfinished stripe.
+ splits.add(createSplit(current.offset, current.length, null));
+ current.offset = -1;
+ }
+ lastIdx = index;
+ String debugStr = null;
+ if (LOG.isDebugEnabled()) {
+ debugStr = current.toString();
+ }
+ current = generateOrUpdateSplit(splits, current, si.getOffset(), si.getLength(), null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updated split from {" + index + ": " + si.getOffset() + ", "
+ + si.getLength() + "} and "+ debugStr + " to " + current);
}
}
+ generateLastSplit(splits, current, null);
+ return splits;
+ }
+ private List<OrcSplit> generateSplitsFromStripes(boolean[] includeStripe) throws IOException {
+ List<OrcSplit> splits = new ArrayList<>(stripes.size());
// if we didn't have predicate pushdown, read everything
if (includeStripe == null) {
includeStripe = new boolean[stripes.size()];
Arrays.fill(includeStripe, true);
}
- long currentOffset = -1;
- long currentLength = 0;
+ OffsetAndLength current = new OffsetAndLength();
int idx = -1;
for (StripeInformation stripe : stripes) {
idx++;
if (!includeStripe[idx]) {
// create split for the previous unfinished stripe
- if (currentOffset != -1) {
- splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
- currentOffset = -1;
+ if (current.offset != -1) {
+ splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+ current.offset = -1;
}
continue;
}
- // if we are working on a stripe, over the min stripe size, and
- // crossed a block boundary, cut the input split here.
- if (currentOffset != -1 && currentLength > context.minSize &&
- (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
- splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
- currentOffset = -1;
- }
- // if we aren't building a split, start a new one.
- if (currentOffset == -1) {
- currentOffset = stripe.getOffset();
- currentLength = stripe.getLength();
- } else {
- currentLength =
- (stripe.getOffset() + stripe.getLength()) - currentOffset;
- }
- if (currentLength >= context.maxSize) {
- splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
- currentOffset = -1;
- }
- }
- if (currentOffset != -1) {
- splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
+ current = generateOrUpdateSplit(
+ splits, current, stripe.getOffset(), stripe.getLength(), fileMetaInfo);
}
+ generateLastSplit(splits, current, fileMetaInfo);
- // add uncovered ACID delta splits
+ // Add uncovered ACID delta splits.
splits.addAll(deltaSplits);
return splits;
}
+ private OffsetAndLength generateOrUpdateSplit(
+ List<OrcSplit> splits, OffsetAndLength current, long offset,
+ long length, FileMetaInfo fileMetaInfo) throws IOException {
+ // if we are working on a stripe, over the min stripe size, and
+ // crossed a block boundary, cut the input split here.
+ if (current.offset != -1 && current.length > context.minSize &&
+ (current.offset / blockSize != offset / blockSize)) {
+ splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+ current.offset = -1;
+ }
+ // if we aren't building a split, start a new one.
+ if (current.offset == -1) {
+ current.offset = offset;
+ current.length = length;
+ } else {
+ current.length = (offset + length) - current.offset;
+ }
+ if (current.length >= context.maxSize) {
+ splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+ current.offset = -1;
+ }
+ return current;
+ }
+
+ private void generateLastSplit(List<OrcSplit> splits, OffsetAndLength current,
+ FileMetaInfo fileMetaInfo) throws IOException {
+ if (current.offset == -1) return;
+ splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+ }
+
private void populateAndCacheStripeDetails() throws IOException {
// Only create OrcReader if we are missing some information.
List<OrcProto.ColumnStatistics> colStatsLocal;
@@ -1290,7 +1368,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
assert fileInfo.stripeStats != null && fileInfo.types != null
&& fileInfo.writerVersion != null;
// We assume that if we needed to create a reader, we need to cache it to meta cache.
- // TODO: This will also needlessly overwrite it in local cache for now.
+ // This will also needlessly overwrite it in local cache for now.
context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, orcReader);
}
} else {
@@ -1330,10 +1408,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- static List<OrcSplit> generateSplitsInfo(Configuration conf)
- throws IOException {
- return generateSplitsInfo(conf, -1);
- }
/** Class intended to update two values from methods... Java-related cruft. */
@VisibleForTesting
@@ -1342,14 +1416,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
long combineStartUs;
}
- static List<OrcSplit> generateSplitsInfo(Configuration conf, int numSplits)
+ static List<OrcSplit> generateSplitsInfo(Configuration conf, Context context)
throws IOException {
- // Use threads to resolve directories into splits.
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
- // Create HiveConf once, since this is expensive.
- conf = new HiveConf(conf, OrcInputFormat.class);
- }
- Context context = new Context(conf, numSplits);
if (LOG.isInfoEnabled()) {
LOG.info("ORC pushdown predicate: " + context.sarg);
}
@@ -1391,7 +1459,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (adi == null) {
// We were combining SS-es and the time has expired.
assert combinedCtx.combined != null;
- scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+ scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
combinedCtx.combined = null;
continue;
}
@@ -1409,7 +1477,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Hack note - different split strategies return differently typed lists, yay Java.
// This works purely by magic, because we know which strategy produces which type.
if (splitStrategy instanceof ETLSplitStrategy) {
- scheduleSplits((ETLSplitStrategy)splitStrategy, context, splitFutures, strategyFutures);
+ scheduleSplits((ETLSplitStrategy)splitStrategy,
+ context, splitFutures, strategyFutures, splits);
} else {
@SuppressWarnings("unchecked")
List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
@@ -1419,7 +1488,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Run the last combined strategy, if any.
if (combinedCtx != null && combinedCtx.combined != null) {
- scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+ scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
combinedCtx.combined = null;
}
@@ -1452,10 +1521,18 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return splits;
}
+ @VisibleForTesting
+ // We could have this as a protected method w/no class, but half of Hive is static, so there.
+ public static class ContextFactory {
+ public Context create(Configuration conf, int numSplits) {
+ return new Context(conf, numSplits);
+ }
+ }
+
private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context context,
- List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures)
- throws IOException {
- Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures);
+ List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures,
+ List<OrcSplit> splits) throws IOException {
+ Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures, splits);
if (ssFuture == null) return;
strategyFutures.add(ssFuture);
}
@@ -1504,7 +1581,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (isDebugEnabled) {
LOG.debug("getSplits started");
}
- List<OrcSplit> result = generateSplitsInfo(job, numSplits);
+ Configuration conf = job;
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
+ // Create HiveConf once, since this is expensive.
+ conf = new HiveConf(conf, OrcInputFormat.class);
+ }
+ List<OrcSplit> result = generateSplitsInfo(conf,
+ new Context(conf, numSplits, createExternalCaches()));
if (isDebugEnabled) {
LOG.debug("getSplits finished");
}
@@ -1517,10 +1600,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* Stores information relevant to split generation for an ORC File.
*
*/
- private static class FileInfo {
- private final long modificationTime;
- private final long size;
- private final Long fileId;
+ static class FileInfo {
+ final long modificationTime;
+ final long size;
+ final Long fileId;
private final List<StripeInformation> stripeInfos;
private FileMetaInfo fileMetaInfo;
private final List<StripeStatistics> stripeStats;
@@ -1898,196 +1981,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
* Represents footer cache.
*/
public interface FooterCache {
- FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException;
+ static final ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]);
+
+ void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+ FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, HiveException;
+ boolean hasPpd();
boolean isBlocking();
void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
throws IOException;
}
- /** Local footer cache using Guava. Stores convoluted Java objects. */
- private static class LocalCache implements FooterCache {
- private final Cache<Path, FileInfo> cache;
-
- public LocalCache(int numThreads, int cacheStripeDetailsSize) {
- cache = CacheBuilder.newBuilder()
- .concurrencyLevel(numThreads)
- .initialCapacity(cacheStripeDetailsSize)
- .maximumSize(cacheStripeDetailsSize)
- .softValues()
- .build();
- }
-
- @Override
- public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) {
- // TODO: should local cache also be by fileId? Preserve the original logic for now.
- FileInfo[] result = new FileInfo[files.size()];
- int i = -1;
- for (HdfsFileStatusWithId fileWithId : files) {
- ++i;
- FileStatus file = fileWithId.getFileStatus();
- Path path = file.getPath();
- Long fileId = fileWithId.getFileId();
- FileInfo fileInfo = cache.getIfPresent(path);
- if (isDebugEnabled) {
- LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path);
- }
- if (fileInfo == null) continue;
- if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId)
- || (fileInfo.modificationTime == file.getModificationTime() &&
- fileInfo.size == file.getLen())) {
- result[i] = fileInfo;
- continue;
- }
- // Invalidate
- cache.invalidate(path);
- if (isDebugEnabled) {
- LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: "
- + fileInfo.modificationTime + ", CurrentModificationTime: "
- + file.getModificationTime() + ", CachedLength: " + fileInfo.size
- + ", CurrentLength: " + file.getLen());
- }
- }
- return result;
- }
-
- public void put(Path path, FileInfo fileInfo) {
- cache.put(path, fileInfo);
- }
-
- @Override
- public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
- throws IOException {
- cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(),
- orcReader.getStripes(), orcReader.getStripeStatistics(), orcReader.getTypes(),
- orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
- fileId));
- }
-
- @Override
- public boolean isBlocking() {
- return false;
- }
- }
-
- /** Metastore-based footer cache storing serialized footers. Also has a local cache. */
- public static class MetastoreCache implements FooterCache {
- private final LocalCache localCache;
- private boolean isWarnLogged = false;
- private HiveConf conf;
-
- public MetastoreCache(LocalCache lc) {
- localCache = lc;
- }
-
- @Override
- public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException {
- // First, check the local cache.
- FileInfo[] result = localCache.getAndValidate(files);
- assert result.length == files.size();
- // This is an unfortunate consequence of batching/iterating thru MS results.
- // TODO: maybe have a direct map call for small lists if this becomes a perf issue.
- HashMap<Long, Integer> posMap = new HashMap<>(files.size());
- for (int i = 0; i < result.length; ++i) {
- if (result[i] != null) continue;
- HdfsFileStatusWithId file = files.get(i);
- Long fileId = file.getFileId();
- if (fileId == null) {
- if (!isWarnLogged || isDebugEnabled) {
- LOG.warn("Not using metastore cache because fileId is missing: "
- + file.getFileStatus().getPath());
- isWarnLogged = true;
- }
- continue;
- }
- posMap.put(fileId, i);
- }
- Iterator<Entry<Long, ByteBuffer>> iter = null;
- Hive hive;
- try {
- hive = getHive();
- iter = hive.getFileMetadata(Lists.newArrayList(posMap.keySet()), conf).iterator();
- } catch (HiveException ex) {
- throw new IOException(ex);
- }
- List<Long> corruptIds = null;
- while (iter.hasNext()) {
- Entry<Long, ByteBuffer> e = iter.next();
- int ix = posMap.get(e.getKey());
- assert result[ix] == null;
- HdfsFileStatusWithId file = files.get(ix);
- assert file.getFileId() == e.getKey();
- result[ix] = createFileInfoFromMs(file, e.getValue());
- if (result[ix] == null) {
- if (corruptIds == null) {
- corruptIds = new ArrayList<>();
- }
- corruptIds.add(file.getFileId());
- } else {
- localCache.put(file.getFileStatus().getPath(), result[ix]);
- }
- }
- if (corruptIds != null) {
- try {
- hive.clearFileMetadata(corruptIds);
- } catch (HiveException ex) {
- LOG.error("Failed to clear corrupt cache data", ex);
- }
- }
- return result;
- }
-
- private Hive getHive() throws HiveException {
- // TODO: we wish we could cache the Hive object, but it's not thread safe, and each
- // threadlocal we "cache" would need to be reinitialized for every query. This is
- // a huge PITA. Hive object will be cached internally, but the compat check will be
- // done every time inside get().
- return Hive.getWithFastCheck(conf);
- }
-
- private static FileInfo createFileInfoFromMs(
- HdfsFileStatusWithId file, ByteBuffer bb) throws IOException {
- FileStatus fs = file.getFileStatus();
- ReaderImpl.FooterInfo fi = null;
- ByteBuffer original = bb.duplicate();
- try {
- fi = ReaderImpl.extractMetaInfoFromFooter(bb, fs.getPath());
- } catch (Exception ex) {
- byte[] data = new byte[original.remaining()];
- System.arraycopy(original.array(), original.arrayOffset() + original.position(),
- data, 0, data.length);
- String msg = "Failed to parse the footer stored in cache for file ID "
- + file.getFileId() + " " + original + " [ " + Hex.encodeHexString(data) + " ]";
- LOG.error(msg, ex);
- return null;
- }
- return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(), fi.getMetadata(),
- fi.getFooter().getTypesList(), fi.getFooter().getStatisticsList(), fi.getFileMetaInfo(),
- fi.getFileMetaInfo().writerVersion, file.getFileId());
- }
-
- @Override
- public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
- throws IOException {
- localCache.put(fileId, file, fileMetaInfo, orcReader);
- if (fileId != null) {
- try {
- getHive().putFileMetadata(Lists.newArrayList(fileId),
- Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter()));
- } catch (HiveException e) {
- throw new IOException(e);
- }
- }
- }
-
- public void configure(HiveConf queryConfig) {
- this.conf = queryConfig;
- }
-
- @Override
- public boolean isBlocking() {
- return true;
- }
- }
/**
* Convert a Hive type property string that contains separated type names into a list of
* TypeDescription objects.
@@ -2283,4 +2186,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return result;
}
+ @VisibleForTesting
+ protected ExternalFooterCachesByConf createExternalCaches() {
+ return null; // The default ones are created in case of null; tests override this.
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
index 2782d7e..c4a7226 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -121,9 +123,8 @@ public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits started");
}
- List<OrcSplit> splits =
- OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
- .getConfiguration(jobContext));
+ Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext);
+ List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, createContext(conf, -1));
List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
for(OrcSplit split: splits) {
result.add(new OrcNewSplit(split));
@@ -134,4 +135,13 @@ public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
return result;
}
+ // Nearly C/P from OrcInputFormat; there are too many statics everywhere to sort this out.
+ private Context createContext(Configuration conf, int numSplits) {
+ // Use threads to resolve directories into splits.
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
+ // Create HiveConf once, since this is expensive.
+ conf = new HiveConf(conf, OrcInputFormat.class);
+ }
+ return new Context(conf, numSplits, null);
+ }
}