You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/04/23 16:54:47 UTC
hive git commit: HIVE-19214: High throughput ingest ORC format
(Prasanth Jayachandran reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/branch-3 818c8cd50 -> f9d7e8c98
HIVE-19214: High throughput ingest ORC format (Prasanth Jayachandran reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f9d7e8c9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f9d7e8c9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f9d7e8c9
Branch: refs/heads/branch-3
Commit: f9d7e8c980ae47f3d88cff239ec7d3f1910ca15d
Parents: 818c8cd
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Apr 23 09:52:08 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Apr 23 09:52:25 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../llap/io/decode/OrcEncodedDataConsumer.java | 13 +++-
.../llap/io/encoded/OrcEncodedDataReader.java | 10 +--
.../llap/io/metadata/OrcStripeMetadata.java | 17 +++++
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 5 ++
.../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 7 ++
.../ql/io/orc/encoded/EncodedReaderImpl.java | 36 +++++++---
.../queries/clientpositive/orc_ppd_exception.q | 13 ++++
.../test/queries/clientpositive/vector_acid3.q | 10 +++
.../clientpositive/llap/vector_acid3.q.out | 34 +++++++++
.../clientpositive/orc_ppd_exception.q.out | 57 ++++++++++++++++
.../results/clientpositive/vector_acid3.q.out | 34 +++++++++
.../apache/hive/streaming/TestStreaming.java | 72 ++++++++++++++++++++
13 files changed, 293 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 607f8ba..a6422de 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1874,6 +1874,9 @@ public class HiveConf extends Configuration {
HIVE_ORC_BASE_DELTA_RATIO("hive.exec.orc.base.delta.ratio", 8, "The ratio of base writer and\n" +
"delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."),
+ HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED("hive.exec.orc.delta.streaming.optimizations.enabled", false,
+ "Whether to enable streaming optimizations for ORC delta files. This will disable ORC's internal indexes,\n" +
+ "disable compression, enable fast encoding and disable dictionary encoding."),
HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet("HYBRID", "BI", "ETL"),
"This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation" +
" as opposed to query execution (split generation does not read or cache file footers)." +
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 9e8ae10..fc0c66a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -113,18 +113,25 @@ public class OrcEncodedDataConsumer
ConsumerStripeMetadata stripeMetadata = stripes.get(currentStripeIndex);
// Get non null row count from root column, to get max vector batches
int rgIdx = batch.getBatchKey().rgIx;
- long nonNullRowCount = -1;
+ long nonNullRowCount;
+ boolean noIndex = false;
if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) {
nonNullRowCount = stripeMetadata.getRowCount();
} else {
OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexEntry(0, rgIdx);
- nonNullRowCount = getRowCount(rowIndex);
+ // index is disabled
+ if (rowIndex == null) {
+ nonNullRowCount = stripeMetadata.getRowCount();
+ noIndex = true;
+ } else {
+ nonNullRowCount = getRowCount(rowIndex);
+ }
}
int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
TypeDescription fileSchema = fileMetadata.getSchema();
- if (columnReaders == null || !sameStripe) {
+ if (columnReaders == null || !sameStripe || noIndex) {
createColumnReaders(batch, stripeMetadata, fileSchema);
} else {
repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata);
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index afb8fc5..4033b37 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -692,10 +692,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return result;
} finally {
try {
- if (isPool && !isCodecError) {
- OrcCodecPool.returnCodec(kind, codec);
- } else {
- codec.close();
+ if (codec != null) {
+ if (isPool && !isCodecError) {
+ OrcCodecPool.returnCodec(kind, codec);
+ } else {
+ codec.close();
+ }
}
} catch (Exception ex) {
LOG.error("Ignoring codec cleanup error", ex);
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 3d9e99c..382d398 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -88,11 +88,28 @@ public class OrcStripeMetadata implements ConsumerStripeMetadata {
@Override
public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) {
+ if (rowIndex == null || rowIndex.getRowGroupIndex()[colIx] == null) {
+ return null;
+ }
return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx);
}
@Override
public boolean supportsRowIndexes() {
+ if (rowIndex == null) {
+ return false;
+ }
+ // if all row indexes are null then indexes are disabled
+ boolean allNulls = true;
+ for (OrcProto.RowIndex rowIndex : rowIndex.getRowGroupIndex()) {
+ if (rowIndex != null) {
+ allNulls = false;
+ break;
+ }
+ }
+ if (allNulls) {
+ return false;
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fe109d7..dc6cc62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -2199,6 +2199,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
// column statistics at index 0 contains only the number of rows
ColumnStatistics stats = stripeStatistics.getColumnStatistics()[filterColumns[pred]];
+ // if row count is 0 and where there are no nulls it means index is disabled and we don't have stats
+ if (stats.getNumberOfValues() == 0 && !stats.hasNull()) {
+ truthValues[pred] = TruthValue.YES_NO_NULL;
+ continue;
+ }
PredicateLeaf leaf = predLeaves.get(pred);
try {
truthValues[pred] = RecordReaderImpl.evaluatePredicate(stats, leaf, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index d850062..09f8802 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -294,6 +294,13 @@ public class OrcRecordUpdater implements RecordUpdater {
writerOptions.bufferSize(baseBufferSizeValue / ratio);
writerOptions.stripeSize(baseStripeSizeValue / ratio);
writerOptions.blockPadding(false);
+ if (optionsCloneForDelta.getConfiguration().getBoolean(
+ HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED.varname, false)) {
+ writerOptions.compress(CompressionKind.NONE);
+ writerOptions.encodingStrategy(org.apache.orc.OrcFile.EncodingStrategy.SPEED);
+ writerOptions.rowIndexStride(0);
+ writerOptions.getConfiguration().set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(), "-1.0");
+ }
}
writerOptions.fileSystem(fs).callback(indexBuilder);
rowInspector = (StructObjectInspector)options.getInspector();
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 4e17394..1d7eceb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -395,7 +395,7 @@ class EncodedReaderImpl implements EncodedReader {
// We go by RG and not by column because that is how data is processed.
boolean hasError = true;
try {
- int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+ int rgCount = rowIndexStride == 0 ? 1 : (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
if (rgs != null && !rgs[rgIx]) {
continue; // RG filtered.
@@ -409,19 +409,31 @@ class EncodedReaderImpl implements EncodedReader {
ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length);
for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
ColumnReadContext ctx = colCtxs[colIx];
- if (ctx == null) continue; // This column is not included.
+ if (ctx == null) continue; // This column is not included
+
+ OrcProto.RowIndexEntry index;
+ OrcProto.RowIndexEntry nextIndex;
+ // index is disabled
+ if (ctx.rowIndex == null) {
+ if (isTracingEnabled) {
+ LOG.trace("Row index is null. Likely reading a file with indexes disabled.");
+ }
+ index = null;
+ nextIndex = null;
+ } else {
+ index = ctx.rowIndex.getEntry(rgIx);
+ nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
+ }
if (isTracingEnabled) {
LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, isLastRg, rgCount);
}
- OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
- nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
ecb.initOrcColumn(ctx.colIx);
trace.logStartCol(ctx.colIx);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- ColumnStreamData cb = null;
+ ColumnStreamData cb;
try {
- if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
+ if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
if (isTracingEnabled) {
LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
@@ -685,12 +697,14 @@ class EncodedReaderImpl implements EncodedReader {
@Override
public void close() throws IOException {
try {
- if (isCodecFromPool && !isCodecFailure) {
- OrcCodecPool.returnCodec(compressionKind, codec);
- } else {
- codec.close();
+ if (codec != null) {
+ if (isCodecFromPool && !isCodecFailure) {
+ OrcCodecPool.returnCodec(compressionKind, codec);
+ } else {
+ codec.close();
+ }
+ codec = null;
}
- codec = null;
} catch (Exception ex) {
LOG.error("Ignoring error from codec", ex);
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/test/queries/clientpositive/orc_ppd_exception.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_ppd_exception.q b/ql/src/test/queries/clientpositive/orc_ppd_exception.q
index 1513d91..a9db8f0 100644
--- a/ql/src/test/queries/clientpositive/orc_ppd_exception.q
+++ b/ql/src/test/queries/clientpositive/orc_ppd_exception.q
@@ -12,3 +12,16 @@ insert into table test_acid values (1, '2014-09-14 12:34:30');
delete from test_acid where ts = '2014-15-16 17:18:19.20';
select i,ts from test_acid where ts = '2014-15-16 17:18:19.20';
select i,ts from test_acid where ts <= '2014-09-14 12:34:30';
+
+drop table test_acid;
+set hive.exec.orc.delta.streaming.optimizations.enabled=true;
+
+create table test_acid( i int, ts timestamp)
+ clustered by (i) into 2 buckets
+ stored as orc
+ tblproperties ('transactional'='true');
+insert into table test_acid values (1, '2014-09-14 12:34:30');
+delete from test_acid where ts = '2014-15-16 17:18:19.20';
+select i,ts from test_acid where ts = '2014-15-16 17:18:19.20';
+select i,ts from test_acid where ts <= '2014-09-14 12:34:30';
+
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/test/queries/clientpositive/vector_acid3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_acid3.q b/ql/src/test/queries/clientpositive/vector_acid3.q
index d4313f4..d284e52 100644
--- a/ql/src/test/queries/clientpositive/vector_acid3.q
+++ b/ql/src/test/queries/clientpositive/vector_acid3.q
@@ -15,3 +15,13 @@ set hive.compute.query.using.stats=false;
set hive.vectorized.execution.enabled;
select count(1) from testacid1;
+
+drop table testacid1;
+set hive.exec.orc.delta.streaming.optimizations.enabled=true;
+
+create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true");
+
+insert into table testacid1 values (1),(2),(3),(4);
+
+select count(1) from testacid1;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/test/results/clientpositive/llap/vector_acid3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_acid3.q.out b/ql/src/test/results/clientpositive/llap/vector_acid3.q.out
index 46c82fc..396f76e 100644
--- a/ql/src/test/results/clientpositive/llap/vector_acid3.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_acid3.q.out
@@ -29,3 +29,37 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@testacid1
#### A masked pattern was here ####
4
+PREHOOK: query: drop table testacid1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@testacid1
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: drop table testacid1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@testacid1
+POSTHOOK: Output: default@testacid1
+PREHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testacid1
+PREHOOK: query: insert into table testacid1 values (1),(2),(3),(4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: insert into table testacid1 values (1),(2),(3),(4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@testacid1
+POSTHOOK: Lineage: testacid1.id SCRIPT []
+PREHOOK: query: select count(1) from testacid1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testacid1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(1) from testacid1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testacid1
+#### A masked pattern was here ####
+4
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/test/results/clientpositive/orc_ppd_exception.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_ppd_exception.q.out b/ql/src/test/results/clientpositive/orc_ppd_exception.q.out
index e03eb71..eee9176 100644
--- a/ql/src/test/results/clientpositive/orc_ppd_exception.q.out
+++ b/ql/src/test/results/clientpositive/orc_ppd_exception.q.out
@@ -47,3 +47,60 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@test_acid
#### A masked pattern was here ####
1 2014-09-14 12:34:30
+PREHOOK: query: drop table test_acid
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@test_acid
+PREHOOK: Output: default@test_acid
+POSTHOOK: query: drop table test_acid
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@test_acid
+POSTHOOK: Output: default@test_acid
+PREHOOK: query: create table test_acid( i int, ts timestamp)
+ clustered by (i) into 2 buckets
+ stored as orc
+ tblproperties ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_acid
+POSTHOOK: query: create table test_acid( i int, ts timestamp)
+ clustered by (i) into 2 buckets
+ stored as orc
+ tblproperties ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_acid
+PREHOOK: query: insert into table test_acid values (1, '2014-09-14 12:34:30')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@test_acid
+POSTHOOK: query: insert into table test_acid values (1, '2014-09-14 12:34:30')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@test_acid
+POSTHOOK: Lineage: test_acid.i SCRIPT []
+POSTHOOK: Lineage: test_acid.ts SCRIPT []
+PREHOOK: query: delete from test_acid where ts = '2014-15-16 17:18:19.20'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_acid
+PREHOOK: Output: default@test_acid
+POSTHOOK: query: delete from test_acid where ts = '2014-15-16 17:18:19.20'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_acid
+POSTHOOK: Output: default@test_acid
+PREHOOK: query: select i,ts from test_acid where ts = '2014-15-16 17:18:19.20'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_acid
+#### A masked pattern was here ####
+POSTHOOK: query: select i,ts from test_acid where ts = '2014-15-16 17:18:19.20'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_acid
+#### A masked pattern was here ####
+PREHOOK: query: select i,ts from test_acid where ts <= '2014-09-14 12:34:30'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@test_acid
+#### A masked pattern was here ####
+POSTHOOK: query: select i,ts from test_acid where ts <= '2014-09-14 12:34:30'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@test_acid
+#### A masked pattern was here ####
+1 2014-09-14 12:34:30
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/ql/src/test/results/clientpositive/vector_acid3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/vector_acid3.q.out b/ql/src/test/results/clientpositive/vector_acid3.q.out
index 46c82fc..396f76e 100644
--- a/ql/src/test/results/clientpositive/vector_acid3.q.out
+++ b/ql/src/test/results/clientpositive/vector_acid3.q.out
@@ -29,3 +29,37 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@testacid1
#### A masked pattern was here ####
4
+PREHOOK: query: drop table testacid1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@testacid1
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: drop table testacid1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@testacid1
+POSTHOOK: Output: default@testacid1
+PREHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@testacid1
+PREHOOK: query: insert into table testacid1 values (1),(2),(3),(4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@testacid1
+POSTHOOK: query: insert into table testacid1 values (1),(2),(3),(4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@testacid1
+POSTHOOK: Lineage: testacid1.id SCRIPT []
+PREHOOK: query: select count(1) from testacid1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@testacid1
+#### A masked pattern was here ####
+POSTHOOK: query: select count(1) from testacid1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@testacid1
+#### A masked pattern was here ####
+4
http://git-wip-us.apache.org/repos/asf/hive/blob/f9d7e8c9/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 6f63bfb..e5dd3b3 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -1579,6 +1579,78 @@ public class TestStreaming {
}
@Test
+ public void testFileDumpDeltaFiles() throws Exception {
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true);
+ try {
+ dropDB(msClient, dbName3);
+ dropDB(msClient, dbName4);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ StreamingConnection connection = endPt.newConnection(false, agentInfo);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, conf, connection);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,streaming".getBytes());
+ txnBatch.write("name2,2,streaming".getBytes());
+ txnBatch.write("name4,2,unlimited".getBytes());
+ txnBatch.write("name5,2,unlimited".getBytes());
+ for (int i = 0; i < 6000; i++) {
+ if (i % 2 == 0) {
+ txnBatch.write(("name" + i + "," + i + "," + "streaming").getBytes());
+ } else {
+ txnBatch.write(("name" + i + "," + i + "," + "unlimited").getBytes());
+ }
+ }
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+
+ // replace stderr and run command
+ System.setOut(new PrintStream(myOut));
+ FileDump.main(new String[]{dbLocation});
+ System.out.flush();
+ System.setOut(origOut);
+
+ String outDump = new String(myOut.toByteArray());
+ // make sure delta files are written with no indexes, no compression and no dictionary
+ // no compression
+ Assert.assertEquals(true, outDump.contains("Compression: NONE"));
+ // no stats/indexes
+ Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull: false"));
+ Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull: false"));
+ Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull: false"));
+ Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull: false sum: 0"));
+ Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull: false"));
+ // no dictionary
+ Assert.assertEquals(true, outDump.contains("Encoding column 7: DIRECT_V2"));
+ Assert.assertEquals(true, outDump.contains("Encoding column 9: DIRECT_V2"));
+ } finally {
+ conf.unset(HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED.varname);
+ }
+ }
+
+ @Test
public void testFileDumpCorruptDataFiles() throws Exception {
dropDB(msClient, dbName3);