You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/04 04:55:17 UTC
[28/28] hive git commit: HIVE-11730 : LLAP: merge master into branch
(Sergey Shelukhin)
HIVE-11730 : LLAP: merge master into branch (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/772c4b90
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/772c4b90
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/772c4b90
Branch: refs/heads/llap
Commit: 772c4b90f99dd6adafbc10e1bad5c5a40c803b7a
Parents: 0a20369 bb4f5e7
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Sep 3 19:53:51 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Sep 3 19:53:51 2015 -0700
----------------------------------------------------------------------
data/conf/hive-log4j2.xml | 3 -
errata.txt | 10 +
.../antlr4/org/apache/hive/hplsql/Hplsql.g4 | 85 +-
.../java/org/apache/hive/hplsql/Column.java | 65 +
.../main/java/org/apache/hive/hplsql/Exec.java | 142 +-
.../java/org/apache/hive/hplsql/Expression.java | 6 +
.../main/java/org/apache/hive/hplsql/Meta.java | 118 ++
.../main/java/org/apache/hive/hplsql/Row.java | 97 ++
.../java/org/apache/hive/hplsql/Select.java | 16 +-
.../main/java/org/apache/hive/hplsql/Stmt.java | 73 +-
.../main/java/org/apache/hive/hplsql/Var.java | 37 +-
.../apache/hive/hplsql/functions/Function.java | 13 +
.../org/apache/hive/hplsql/TestHplsqlLocal.java | 7 +-
.../apache/hive/hplsql/TestHplsqlOffline.java | 2 +-
.../src/test/queries/db/rowtype_attribute.sql | 22 +
hplsql/src/test/queries/db/type_attribute.sql | 8 +
.../local/create_procedure_no_params.sql | 19 +
.../test/queries/offline/create_table_ora.sql | 49 +
.../test/results/db/rowtype_attribute.out.txt | 42 +
.../src/test/results/db/type_attribute.out.txt | 15 +
.../local/create_procedure_no_params.out.txt | 26 +
.../results/offline/create_table_ora.out.txt | 38 +
.../hive/metastore/MetaStoreDirectSql.java | 29 +-
pom.xml | 2 +-
.../hive/ql/exec/tez/TezSessionState.java | 1 +
.../apache/hadoop/hive/ql/io/orc/FileDump.java | 48 +-
.../hadoop/hive/ql/io/orc/FileMetaInfo.java | 13 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 193 ++-
.../apache/hadoop/hive/ql/io/orc/Reader.java | 4 +
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 270 +++-
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 2 -
.../hive/ql/io/parquet/LeafFilterFactory.java | 43 +-
.../read/ParquetFilterPredicateConverter.java | 35 +-
.../hive/ql/io/sarg/ConvertAstToSearchArg.java | 3 -
.../hadoop/hive/ql/lib/DefaultGraphWalker.java | 80 +-
.../hadoop/hive/ql/lib/ForwardWalker.java | 33 +-
.../apache/hadoop/hive/ql/lib/RuleRegExp.java | 22 +-
.../hadoop/hive/ql/optimizer/ColumnPruner.java | 6 +-
.../hive/ql/optimizer/ConstantPropagate.java | 10 +-
.../ql/optimizer/IdentityProjectRemover.java | 15 +
.../hadoop/hive/ql/optimizer/IndexUtils.java | 13 +-
.../ql/optimizer/calcite/HiveRelOptUtil.java | 23 -
.../calcite/reloperators/HiveSort.java | 29 +-
.../rules/HiveJoinProjectTransposeRule.java | 238 +--
.../calcite/translator/HiveOpConverter.java | 22 +-
.../hadoop/hive/ql/parse/CalcitePlanner.java | 5 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 4 +
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 +-
.../hive/ql/io/orc/TestInputOutputFormat.java | 17 +-
.../hadoop/hive/ql/io/orc/TestOrcFile.java | 10 +-
.../hive/ql/io/orc/TestRecordReaderImpl.java | 42 +-
.../parquet/TestParquetRecordReaderWrapper.java | 50 +-
.../read/TestParquetFilterPredicate.java | 27 +-
.../ql/io/sarg/TestConvertAstToSearchArg.java | 128 +-
.../hive/ql/io/sarg/TestSearchArgumentImpl.java | 22 +-
.../clientpositive/cbo_rp_outer_join_ppr.q | 40 +
.../queries/clientpositive/groupby1_map_nomap.q | 2 +
ql/src/test/queries/clientpositive/groupby6.q | 2 +
.../clientpositive/groupby_grouping_id2.q | 2 +
.../clientpositive/groupby_ppr_multi_distinct.q | 2 +
ql/src/test/queries/clientpositive/having2.q | 27 +
.../clientpositive/parquet_ppd_boolean.q | 35 +
.../queries/clientpositive/parquet_ppd_char.q | 76 +
.../queries/clientpositive/parquet_ppd_date.q | 101 ++
.../clientpositive/parquet_ppd_decimal.q | 163 ++
.../clientpositive/parquet_ppd_partition.q | 9 +
.../clientpositive/parquet_ppd_timestamp.q | 98 ++
.../clientpositive/parquet_ppd_varchar.q | 76 +
.../clientpositive/parquet_predicate_pushdown.q | 297 +++-
.../test/queries/clientpositive/pointlookup3.q | 41 +
.../queries/clientpositive/ptfgroupbyjoin.q | 61 +
.../cbo_rp_outer_join_ppr.q.java1.7.out | 855 +++++++++++
.../clientpositive/constprog_partitioner.q.out | 30 +-
.../clientpositive/correlationoptimizer10.q.out | 48 +-
.../clientpositive/groupby1_map_nomap.q.out | 8 +-
.../test/results/clientpositive/groupby6.q.out | 8 +-
.../clientpositive/groupby_duplicate_key.q.out | 16 +-
.../clientpositive/groupby_grouping_id2.q.out | 28 +-
.../groupby_ppr_multi_distinct.q.out | 8 +-
.../test/results/clientpositive/having2.q.out | 353 +++++
.../clientpositive/parquet_ppd_boolean.q.out | 200 +++
.../clientpositive/parquet_ppd_char.q.out | 220 +++
.../clientpositive/parquet_ppd_date.q.out | 301 ++++
.../clientpositive/parquet_ppd_decimal.q.out | 490 ++++++
.../clientpositive/parquet_ppd_partition.q.out | 47 +
.../clientpositive/parquet_ppd_timestamp.q.out | 292 ++++
.../clientpositive/parquet_ppd_varchar.q.out | 220 +++
.../parquet_predicate_pushdown.q.out | 1309 +++++++++++++++-
.../results/clientpositive/pointlookup3.q.out | 1394 ++++++++++++++++++
.../results/clientpositive/ptfgroupbyjoin.q.out | 519 +++++++
.../spark/constprog_partitioner.q.out | 30 +-
.../spark/groupby1_map_nomap.q.out | 564 +++----
.../results/clientpositive/spark/groupby6.q.out | 20 +-
.../spark/groupby_grouping_id2.q.out | 38 +-
.../spark/groupby_ppr_multi_distinct.q.out | 16 +-
.../clientpositive/spark/subquery_exists.q.out | 12 +-
.../clientpositive/spark/subquery_in.q.out | 36 +-
.../spark/vector_mapjoin_reduce.q.out | 22 +-
.../clientpositive/subquery_exists.q.out | 12 +-
.../results/clientpositive/subquery_in.q.out | 36 +-
.../clientpositive/subquery_in_having.q.out | 50 +-
.../subquery_unqualcolumnrefs.q.out | 26 +-
.../results/clientpositive/subquery_views.q.out | 40 +-
.../clientpositive/tez/explainuser_1.q.out | 309 ++--
.../clientpositive/tez/subquery_exists.q.out | 12 +-
.../clientpositive/tez/subquery_in.q.out | 36 +-
.../clientpositive/tez/vector_inner_join.q.out | 14 +-
.../tez/vector_mapjoin_reduce.q.out | 24 +-
.../clientpositive/vector_inner_join.q.out | 12 +-
.../clientpositive/vector_mapjoin_reduce.q.out | 26 +-
.../hadoop/hive/ql/io/sarg/PredicateLeaf.java | 3 +-
111 files changed, 9648 insertions(+), 1485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index ac460b3,568ebbe..41a742c
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@@ -202,30 -165,9 +202,31 @@@ public class TezSessionState
// and finally we're ready to create and start the session
// generate basic tez config
TezConfiguration tezConfig = new TezConfiguration(conf);
+
+ // set up the staging directory to use
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+ Utilities.stripHivePasswordDetails(tezConfig);
+ ServicePluginsDescriptor servicePluginsDescriptor;
+ UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
+
+ if (llapMode) {
+ // we need plugins to handle llap and uber mode
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
+ new TaskSchedulerDescriptor[]{
+ TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
+ .setUserPayload(servicePluginPayload)},
+ new ContainerLauncherDescriptor[]{
+ ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
+ new TaskCommunicatorDescriptor[]{
+ TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
+ .setUserPayload(servicePluginPayload)});
+ } else {
+ // we need plugins to handle llap and uber mode
+ servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
+ }
+
+ // container prewarming. tell the am how many containers we need
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
n = Math.max(tezConfig.getInt(
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileMetaInfo.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileMetaInfo.java
index aced77b,0000000..2853119
mode 100644,000000..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileMetaInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileMetaInfo.java
@@@ -1,53 -1,0 +1,60 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
++import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
++
+/**
+ * FileMetaInfo - represents file metadata stored in footer and postscript sections of the file
+ * that is useful for Reader implementation
+ *
+ */
+class FileMetaInfo {
++ ByteBuffer footerMetaAndPsBuffer;
+ final String compressionType;
+ final int bufferSize;
+ final int metadataSize;
+ final ByteBuffer footerBuffer;
+ final List<Integer> versionList;
+ final OrcFile.WriterVersion writerVersion;
+
++
++ /** Ctor used when reading splits - no version list or full footer buffer. */
+ FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
+ ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
+ this(compressionType, bufferSize, metadataSize, footerBuffer, null,
- writerVersion);
++ writerVersion, null);
+ }
+
++ /** Ctor used when creating file info during init and when getting a new one. */
+ public FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, List<Integer> versionList,
- OrcFile.WriterVersion writerVersion){
++ ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion,
++ ByteBuffer fullFooterBuffer) {
+ this.compressionType = compressionType;
+ this.bufferSize = bufferSize;
+ this.metadataSize = metadataSize;
+ this.footerBuffer = footerBuffer;
+ this.versionList = versionList;
+ this.writerVersion = writerVersion;
++ this.footerMetaAndPsBuffer = fullFooterBuffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 380fd4e,cf8694e..5770bef
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@@ -51,9 -53,9 +53,10 @@@ import org.apache.hadoop.hive.ql.io.Aci
import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
@@@ -1053,16 -1023,18 +1025,18 @@@ public class OrcInputFormat implement
throws IOException {
// use threads to resolve directories into splits
Context context = new Context(conf, numSplits);
+ boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
List<OrcSplit> splits = Lists.newArrayList();
- List<Future<?>> pathFutures = Lists.newArrayList();
- List<Future<?>> splitFutures = Lists.newArrayList();
+ List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
+ List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
// multi-threaded file statuses and split strategy
- for (Path dir : getInputPaths(conf)) {
- boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
+ Path[] paths = getInputPaths(conf);
+ CompletionService<AcidDirInfo> ecs = new ExecutorCompletionService<>(Context.threadPool);
+ for (Path dir : paths) {
FileSystem fs = dir.getFileSystem(conf);
FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
- pathFutures.add(context.threadPool.submit(fileGenerator));
+ pathFutures.add(ecs.submit(fileGenerator));
}
// complete path futures and schedule split generation
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 71baabc,187924d..251e1f8
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@@ -352,15 -360,8 +353,18 @@@ public interface Reader
MetadataReader metadata() throws IOException;
+ List<Integer> getVersionList();
+
+ int getMetadataSize();
+
+ List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics();
+
+ List<StripeStatistics> getStripeStatistics();
+
+ List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics();
+
+ DataReader createDefaultDataReader(boolean useZeroCopy);
++
+ /** Gets serialized file metadata read from disk for the purposes of caching, etc. */
+ ByteBuffer getSerializedFileFooter();
-
- Footer getFooter();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 4ff1e28,ab539c4..8661682
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@@ -82,8 -76,11 +82,11 @@@ public class ReaderImpl implements Read
// will help avoid cpu cycles spend in deserializing at cost of increased
// memory footprint.
private final ByteBuffer footerByteBuffer;
+ // Same for metastore cache - maintains the same background buffer, but includes postscript.
+ // This will only be set if the file footer/metadata was read from disk.
+ private final ByteBuffer footerMetaAndPsBuffer;
- static class StripeInformationImpl
+ public static class StripeInformationImpl
implements StripeInformation {
private final OrcProto.StripeInformation stripe;
@@@ -314,61 -310,33 +316,59 @@@
this.path = path;
this.conf = options.getConfiguration();
- FileMetaInfo footerMetaData;
- if (options.getFileMetaInfo() != null) {
- footerMetaData = options.getFileMetaInfo();
+ FileMetadata fileMetadata = options.getFileMetadata();
+ if (fileMetadata != null) {
+ this.compressionKind = fileMetadata.getCompressionKind();
+ this.bufferSize = fileMetadata.getCompressionBufferSize();
+ this.codec = WriterImpl.createCodec(compressionKind);
+ this.metadataSize = fileMetadata.getMetadataSize();
+ this.stripeStats = fileMetadata.getStripeStats();
+ this.versionList = fileMetadata.getVersionList();
+ this.writerVersion = WriterVersion.from(fileMetadata.getWriterVersionNum());
+ this.types = fileMetadata.getTypes();
+ this.rowIndexStride = fileMetadata.getRowIndexStride();
+ this.contentLength = fileMetadata.getContentLength();
+ this.numberOfRows = fileMetadata.getNumberOfRows();
+ this.fileStats = fileMetadata.getFileStats();
+ this.stripes = fileMetadata.getStripes();
-
+ this.inspector = OrcStruct.createObjectInspector(0, fileMetadata.getTypes());
+ this.footerByteBuffer = null; // not cached and not needed here
+ this.userMetadata = null; // not cached and not needed here
+ this.footerMetaAndPsBuffer = null;
} else {
- footerMetaData = extractMetaInfoFromFooter(fs, path,
- options.getMaxLength());
- this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
- }
- MetaInfoObjExtractor rInfo =
- new MetaInfoObjExtractor(footerMetaData.compressionType,
- footerMetaData.bufferSize,
- footerMetaData.metadataSize,
- footerMetaData.footerBuffer
- );
- this.footerByteBuffer = footerMetaData.footerBuffer;
- this.compressionKind = rInfo.compressionKind;
- this.codec = rInfo.codec;
- this.bufferSize = rInfo.bufferSize;
- this.metadataSize = rInfo.metadataSize;
- this.metadata = rInfo.metadata;
- this.footer = rInfo.footer;
- this.inspector = rInfo.inspector;
- this.versionList = footerMetaData.versionList;
- this.writerVersion = footerMetaData.writerVersion;
+ FileMetaInfo footerMetaData;
+ if (options.getFileMetaInfo() != null) {
+ footerMetaData = options.getFileMetaInfo();
++ this.footerMetaAndPsBuffer = null;
+ } else {
+ footerMetaData = extractMetaInfoFromFooter(fs, path,
+ options.getMaxLength());
++ this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
+ }
+ MetaInfoObjExtractor rInfo =
+ new MetaInfoObjExtractor(footerMetaData.compressionType,
+ footerMetaData.bufferSize,
+ footerMetaData.metadataSize,
+ footerMetaData.footerBuffer
+ );
+ this.footerByteBuffer = footerMetaData.footerBuffer;
+ this.compressionKind = rInfo.compressionKind;
+ this.codec = rInfo.codec;
+ this.bufferSize = rInfo.bufferSize;
+ this.metadataSize = rInfo.metadataSize;
+ this.stripeStats = rInfo.metadata.getStripeStatsList();
+ this.types = rInfo.footer.getTypesList();
+ this.rowIndexStride = rInfo.footer.getRowIndexStride();
+ this.contentLength = rInfo.footer.getContentLength();
+ this.numberOfRows = rInfo.footer.getNumberOfRows();
+ this.userMetadata = rInfo.footer.getMetadataList();
+ this.fileStats = rInfo.footer.getStatisticsList();
+ this.inspector = rInfo.inspector;
+ this.versionList = footerMetaData.versionList;
+ this.writerVersion = footerMetaData.writerVersion;
- this.stripes = new ArrayList<StripeInformation>(rInfo.footer.getStripesCount());
- for(OrcProto.StripeInformation info: rInfo.footer.getStripesList()) {
- this.stripes.add(new StripeInformationImpl(info));
- }
++ this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
+ }
}
--
/**
* Get the WriterVersion based on the ORC file postscript.
* @param writerVersion the integer writer version
@@@ -383,6 -351,111 +383,115 @@@
return OrcFile.WriterVersion.ORIGINAL;
}
+ /** Extracts the necessary metadata from an externally store buffer (fullFooterBuffer). */
+ public static FooterInfo extractMetaInfoFromFooter(
+ ByteBuffer bb, Path srcPath) throws IOException {
+ // Read the PostScript. Be very careful as some parts of this historically use bb position
+ // and some use absolute offsets that have to take position into account.
+ int baseOffset = bb.position();
+ int lastByteAbsPos = baseOffset + bb.remaining() - 1;
+ int psLen = bb.get(lastByteAbsPos) & 0xff;
+ int psAbsPos = lastByteAbsPos - psLen;
+ OrcProto.PostScript ps = extractPostScript(bb, srcPath, psLen, psAbsPos);
+ assert baseOffset == bb.position();
+
+ // Extract PS information.
+ int footerSize = (int)ps.getFooterLength(), metadataSize = (int)ps.getMetadataLength(),
+ footerAbsPos = psAbsPos - footerSize, metadataAbsPos = footerAbsPos - metadataSize;
+ String compressionType = ps.getCompression().toString();
+ CompressionCodec codec = WriterImpl.createCodec(CompressionKind.valueOf(compressionType));
+ int bufferSize = (int)ps.getCompressionBlockSize();
+ bb.position(metadataAbsPos);
+ bb.mark();
+
+ // Extract metadata and footer.
- Metadata metadata = new Metadata(extractMetadata(
- bb, metadataAbsPos, metadataSize, codec, bufferSize));
++ OrcProto.Metadata metadata = extractMetadata(
++ bb, metadataAbsPos, metadataSize, codec, bufferSize);
++ List<StripeStatistics> stats = new ArrayList<>(metadata.getStripeStatsCount());
++ for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) {
++ stats.add(new StripeStatistics(ss.getColStatsList()));
++ }
+ OrcProto.Footer footer = extractFooter(bb, footerAbsPos, footerSize, codec, bufferSize);
+ bb.position(metadataAbsPos);
+ bb.limit(psAbsPos);
+ // TODO: do we need footer buffer here? FileInfo/FileMetaInfo is a mess...
+ FileMetaInfo fmi = new FileMetaInfo(
+ compressionType, bufferSize, metadataSize, bb, extractWriterVersion(ps));
- return new FooterInfo(metadata, footer, fmi);
++ return new FooterInfo(stats, footer, fmi);
+ }
+
+ private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+ int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(footerAbsPos);
+ bb.limit(footerAbsPos + footerSize);
- InputStream instream = InStream.create("footer", Lists.<DiskRange>newArrayList(
++ InputStream instream = InStream.create(null, "footer", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), footerSize, codec, bufferSize);
+ return OrcProto.Footer.parseFrom(instream);
+ }
+
+ private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+ int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(metadataAbsPos);
+ bb.limit(metadataAbsPos + metadataSize);
- InputStream instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
++ InputStream instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ CodedInputStream in = CodedInputStream.newInstance(instream);
+ int msgLimit = DEFAULT_PROTOBUF_MESSAGE_LIMIT;
+ OrcProto.Metadata meta = null;
+ do {
+ try {
+ in.setSizeLimit(msgLimit);
+ meta = OrcProto.Metadata.parseFrom(in);
+ } catch (InvalidProtocolBufferException e) {
+ if (e.getMessage().contains("Protocol message was too large")) {
+ LOG.warn("Metadata section is larger than " + msgLimit + " bytes. Increasing the max" +
+ " size of the coded input stream." );
+
+ msgLimit = msgLimit << 1;
+ if (msgLimit > PROTOBUF_MESSAGE_MAX_LIMIT) {
+ LOG.error("Metadata section exceeds max protobuf message size of " +
+ PROTOBUF_MESSAGE_MAX_LIMIT + " bytes.");
+ throw e;
+ }
+
+ // we must have failed in the middle of reading instream and instream doesn't support
+ // resetting the stream
- instream = InStream.create("metadata", Lists.<DiskRange>newArrayList(
++ instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(bb, 0)), metadataSize, codec, bufferSize);
+ in = CodedInputStream.newInstance(instream);
+ } else {
+ throw e;
+ }
+ }
+ } while (meta == null);
+ return meta;
+ }
+
+ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+ int psLen, int psAbsOffset) throws IOException {
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ assert bb.hasArray();
+ CodedInputStream in = CodedInputStream.newInstance(
+ bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ // Check compression codec.
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ return ps;
+ }
+
private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
Path path,
long maxFileLength
@@@ -544,12 -579,82 +615,48 @@@
}
}
- public FileMetaInfo getFileMetaInfo(){
- /**
- * FileMetaInfo - represents file metadata stored in footer and postscript sections of the file
- * that is useful for Reader implementation
- *
- */
- static class FileMetaInfo {
- private ByteBuffer footerMetaAndPsBuffer;
- final String compressionType;
- final int bufferSize;
- final int metadataSize;
- final ByteBuffer footerBuffer;
- final List<Integer> versionList;
- final OrcFile.WriterVersion writerVersion;
-
- /** Ctor used when reading splits - no version list or full footer buffer. */
- FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, OrcFile.WriterVersion writerVersion) {
- this(compressionType, bufferSize, metadataSize, footerBuffer, null,
- writerVersion, null);
- }
-
- /** Ctor used when creating file info during init and when getting a new one. */
- public FileMetaInfo(String compressionType, int bufferSize, int metadataSize,
- ByteBuffer footerBuffer, List<Integer> versionList, WriterVersion writerVersion,
- ByteBuffer fullFooterBuffer) {
- this.compressionType = compressionType;
- this.bufferSize = bufferSize;
- this.metadataSize = metadataSize;
- this.footerBuffer = footerBuffer;
- this.versionList = versionList;
- this.writerVersion = writerVersion;
- this.footerMetaAndPsBuffer = fullFooterBuffer;
- }
- }
-
+ public FileMetaInfo getFileMetaInfo() {
return new FileMetaInfo(compressionKind.toString(), bufferSize,
- metadataSize, footerByteBuffer, versionList, writerVersion);
+ metadataSize, footerByteBuffer, versionList, writerVersion, footerMetaAndPsBuffer);
}
+ /** Same as FileMetaInfo, but with extra fields. FileMetaInfo is serialized for splits
+ * and so we don't just add fields to it, it's already messy and confusing. */
+ public static final class FooterInfo {
+ private final OrcProto.Footer footer;
- private final Metadata metadata;
++ private final List<StripeStatistics> metadata;
+ private final List<StripeInformation> stripes;
+ private final FileMetaInfo fileMetaInfo;
+
- private FooterInfo(Metadata metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) {
++ private FooterInfo(
++ List<StripeStatistics> metadata, OrcProto.Footer footer, FileMetaInfo fileMetaInfo) {
+ this.metadata = metadata;
+ this.footer = footer;
+ this.fileMetaInfo = fileMetaInfo;
+ this.stripes = convertProtoStripesToStripes(footer.getStripesList());
+ }
+ public OrcProto.Footer getFooter() {
+ return footer;
+ }
+
- public Metadata getMetadata() {
++ public List<StripeStatistics> getMetadata() {
+ return metadata;
+ }
+
+ public FileMetaInfo getFileMetaInfo() {
+ return fileMetaInfo;
+ }
+
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+ }
+
+ @Override
+ public ByteBuffer getSerializedFileFooter() {
+ return footerMetaAndPsBuffer;
+ }
@Override
public RecordReader rows() throws IOException {
@@@ -607,17 -714,22 +714,24 @@@
@Override
public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
- return getRawDataSizeFromColIndices(colIndices, footer);
++ return getRawDataSizeFromColIndices(colIndices, types, fileStats);
+ }
+
+ public static long getRawDataSizeFromColIndices(
- List<Integer> colIndices, OrcProto.Footer footer) {
++ List<Integer> colIndices, List<OrcProto.Type> types,
++ List<OrcProto.ColumnStatistics> stats) {
long result = 0;
for (int colIdx : colIndices) {
- result += getRawDataSizeOfColumn(colIdx);
- result += getRawDataSizeOfColumn(colIdx, footer);
++ result += getRawDataSizeOfColumn(colIdx, types, stats);
}
return result;
}
- private long getRawDataSizeOfColumn(int colIdx) {
- OrcProto.ColumnStatistics colStat = fileStats.get(colIdx);
- private static long getRawDataSizeOfColumn(int colIdx, OrcProto.Footer footer) {
- OrcProto.ColumnStatistics colStat = footer.getStatistics(colIdx);
++ private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types,
++ List<OrcProto.ColumnStatistics> stats) {
++ OrcProto.ColumnStatistics colStat = stats.get(colIdx);
long numVals = colStat.getNumberOfValues();
- Type type = footer.getTypes(colIdx);
+ Type type = types.get(colIdx);
switch (type.getKind()) {
case BINARY:
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/results/clientpositive/tez/vector_mapjoin_reduce.q.out
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/772c4b90/ql/src/test/results/clientpositive/vector_inner_join.q.out
----------------------------------------------------------------------