You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/09/22 19:26:26 UTC
[3/3] orc git commit: ORC-101 Correct bloom filters for strings and
decimals to use utf8 encoding.
ORC-101 Correct bloom filters for strings and decimals to use utf8 encoding.
Fixes #60
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/833cc0e7
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/833cc0e7
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/833cc0e7
Branch: refs/heads/master
Commit: 833cc0e73a301bf1aab4ae310e72318ed586efcb
Parents: 7118e96
Author: Owen O'Malley <om...@apache.org>
Authored: Tue Sep 13 13:28:44 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Sep 22 14:25:27 2016 -0500
----------------------------------------------------------------------
c++/include/orc/Reader.hh | 1 +
c++/src/Reader.cc | 2 +
.../src/java/org/apache/orc/BloomFilterIO.java | 50 --
.../src/java/org/apache/orc/DataReader.java | 7 +-
java/core/src/java/org/apache/orc/OrcConf.java | 10 +
java/core/src/java/org/apache/orc/OrcFile.java | 51 +-
.../java/org/apache/orc/TypeDescription.java | 26 +
.../orc/impl/ConvertTreeReaderFactory.java | 12 +-
.../src/java/org/apache/orc/impl/OrcIndex.java | 10 +-
.../org/apache/orc/impl/RecordReaderImpl.java | 70 ++-
.../org/apache/orc/impl/RecordReaderUtils.java | 192 ++++++--
.../org/apache/orc/impl/SchemaEvolution.java | 4 +
.../java/org/apache/orc/impl/StreamName.java | 1 +
.../java/org/apache/orc/impl/WriterImpl.java | 228 +++++++--
.../java/org/apache/orc/util/BloomFilter.java | 328 +++++++++++++
.../java/org/apache/orc/util/BloomFilterIO.java | 95 ++++
.../org/apache/orc/util/BloomFilterUtf8.java | 55 +++
.../test/org/apache/orc/TestVectorOrcFile.java | 4 +-
.../apache/orc/impl/TestRecordReaderImpl.java | 484 +++++++++++++------
.../org/apache/orc/util/TestBloomFilter.java | 92 ++++
.../test/org/apache/orc/util/TestMurmur3.java | 225 +++++++++
java/core/src/test/resources/log4j.properties | 3 +
.../src/test/resources/log4j.properties | 3 +
.../apache/hive/common/util/BloomFilter.java | 313 ------------
.../org/apache/hive/common/util/Murmur3.java | 335 -------------
.../src/java/org/apache/orc/util/Murmur3.java | 335 +++++++++++++
.../apache/hive/common/util/TestMurmur3.java | 224 ---------
.../src/java/org/apache/orc/tools/FileDump.java | 20 +-
.../java/org/apache/orc/tools/JsonFileDump.java | 27 +-
.../test/org/apache/orc/tools/TestFileDump.java | 6 +-
java/tools/src/test/resources/log4j.properties | 21 +
.../resources/orc-file-dump-bloomfilter.out | 106 ++--
.../resources/orc-file-dump-bloomfilter2.out | 121 +++--
.../orc-file-dump-dictionary-threshold.out | 2 +-
.../tools/src/test/resources/orc-file-dump.json | 150 +++---
java/tools/src/test/resources/orc-file-dump.out | 2 +-
.../src/test/resources/orc-file-has-null.out | 2 +-
proto/orc_proto.proto | 2 +
site/_data/releases.yml | 4 +
site/_docs/spec-index.md | 11 +-
site/_docs/stripes.md | 4 +
41 files changed, 2240 insertions(+), 1398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/c++/include/orc/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 25a0a17..eacbd80 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -53,6 +53,7 @@ namespace orc {
WriterVersion_HIVE_4243 = 2,
WriterVersion_HIVE_12055 = 3,
WriterVersion_HIVE_13083 = 4,
+ WriterVersion_ORC_101 = 5,
WriterVersion_MAX = INT64_MAX
};
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 9b1f1b9..91f4ea1 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -72,6 +72,8 @@ namespace orc {
return "HIVE-12055";
case WriterVersion_HIVE_13083:
return "HIVE-13083";
+ case WriterVersion_ORC_101:
+ return "ORC-101";
}
std::stringstream buffer;
buffer << "future - " << version;
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/BloomFilterIO.java b/java/core/src/java/org/apache/orc/BloomFilterIO.java
deleted file mode 100644
index 106227d..0000000
--- a/java/core/src/java/org/apache/orc/BloomFilterIO.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc;
-
-import org.apache.hive.common.util.BloomFilter;
-
-public class BloomFilterIO extends BloomFilter {
-
- public BloomFilterIO(long expectedEntries) {
- super(expectedEntries, DEFAULT_FPP);
- }
-
- public BloomFilterIO(long expectedEntries, double fpp) {
- super(expectedEntries, fpp);
- }
-
- static long[] toArray(OrcProto.BloomFilter filter) {
- long[] result = new long[filter.getBitsetCount()];
- int i =0;
- for(Long l: filter.getBitsetList()) {
- result[i++] = l;
- }
- return result;
- }
-
-/**
- * Initializes the BloomFilter from the given Orc BloomFilter
- */
- public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
- this.bitSet = new BitSet(toArray(bloomFilter));
- this.numHashFunctions = bloomFilter.getNumHashFunctions();
- this.numBits = (int) this.bitSet.bitSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/DataReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/DataReader.java b/java/core/src/java/org/apache/orc/DataReader.java
index a5dbb76..b3f91f2 100644
--- a/java/core/src/java/org/apache/orc/DataReader.java
+++ b/java/core/src/java/org/apache/orc/DataReader.java
@@ -31,9 +31,14 @@ public interface DataReader extends AutoCloseable {
void open() throws IOException;
OrcIndex readRowIndex(StripeInformation stripe,
+ TypeDescription fileSchema,
OrcProto.StripeFooter footer,
- boolean[] included, OrcProto.RowIndex[] indexes,
+ boolean ignoreNonUtf8BloomFilter,
+ boolean[] included,
+ OrcProto.RowIndex[] indexes,
boolean[] sargColumns,
+ OrcFile.WriterVersion version,
+ OrcProto.Stream.Kind[] bloomFilterKinds,
OrcProto.BloomFilterIndex[] bloomFilterIndices
) throws IOException;
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index ac8e3f0..05ab13b 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -105,6 +105,16 @@ public enum OrcConf {
"dictionary or not will be retained thereafter."),
BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns", "orc.bloom.filter.columns",
"", "List of columns to create bloom filters for when writing."),
+ BLOOM_FILTER_WRITE_VERSION("orc.bloom.filter.write.version",
+ "orc.bloom.filter.write.version", OrcFile.BloomFilterVersion.UTF8.toString(),
+ "Which version of the bloom filters should we write.\n" +
+ "The choices are:\n" +
+ " original - writes two versions of the bloom filters for use by\n" +
+ " both old and new readers.\n" +
+ " utf8 - writes just the new bloom filters."),
+ IGNORE_NON_UTF8_BLOOM_FILTERS("orc.bloom.filter.ignore.non-utf8",
+ "orc.bloom.filter.ignore.non-utf8", false,
+ "Should the reader ignore the obsolete non-UTF8 bloom filters."),
MAX_FILE_LENGTH("orc.max.file.length", "orc.max.file.length", Long.MAX_VALUE,
"The maximum size of the file to read for finding the file tail. This\n" +
"is primarily used for streaming ingest to read intermediate\n" +
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index ddfa9f7..6b2d48e 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -108,6 +108,7 @@ public class OrcFile {
HIVE_4243(2), // use real column names from Hive tables
HIVE_12055(3), // vectorized writer
HIVE_13083(4), // decimal writer updating present stream wrongly
+ ORC_101(5), // bloom filters use utf8
// Don't use any magic numbers here except for the below:
FUTURE(Integer.MAX_VALUE); // a version from a future writer
@@ -144,8 +145,12 @@ public class OrcFile {
if (val == FUTURE.id) return FUTURE; // Special handling for the magic value.
return values[val];
}
+
+ public boolean includes(WriterVersion other) {
+ return id >= other.id;
+ }
}
- public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083;
+ public static final WriterVersion CURRENT_WRITER = WriterVersion.ORC_101;
public enum EncodingStrategy {
SPEED, COMPRESSION
@@ -231,6 +236,33 @@ public class OrcFile {
void preFooterWrite(WriterContext context) throws IOException;
}
+ public static enum BloomFilterVersion {
+ // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
+ // both old and new readers.
+ ORIGINAL("original"),
+ // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8.
+ // See ORC-101
+ UTF8("utf8");
+
+ private final String id;
+ private BloomFilterVersion(String id) {
+ this.id = id;
+ }
+
+ public String toString() {
+ return id;
+ }
+
+ public static BloomFilterVersion fromString(String s) {
+ for (BloomFilterVersion version: values()) {
+ if (version.id.equals(s)) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unknown BloomFilterVersion " + s);
+ }
+ }
+
/**
* Options for creating ORC file writers.
*/
@@ -253,6 +285,7 @@ public class OrcFile {
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
+ private BloomFilterVersion bloomFilterVersion;
protected WriterOptions(Properties tableProperties, Configuration conf) {
configuration = conf;
@@ -286,6 +319,10 @@ public class OrcFile {
conf);
bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties,
conf);
+ bloomFilterVersion =
+ BloomFilterVersion.fromString(
+ OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties,
+ conf));
}
/**
@@ -430,6 +467,14 @@ public class OrcFile {
}
/**
+ * Set the version of the bloom filters to write.
+ */
+ public WriterOptions bloomFilterVersion(BloomFilterVersion version) {
+ this.bloomFilterVersion = version;
+ return this;
+ }
+
+ /**
* A package local option to set the memory manager.
*/
protected WriterOptions memory(MemoryManager value) {
@@ -508,6 +553,10 @@ public class OrcFile {
public double getBloomFilterFpp() {
return bloomFilterFpp;
}
+
+ public BloomFilterVersion getBloomFilterVersion() {
+ return bloomFilterVersion;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/TypeDescription.java b/java/core/src/java/org/apache/orc/TypeDescription.java
index da9fe49..bc6787d 100644
--- a/java/core/src/java/org/apache/orc/TypeDescription.java
+++ b/java/core/src/java/org/apache/orc/TypeDescription.java
@@ -842,4 +842,30 @@ public class TypeDescription
printJsonToBuffer("", buffer, 0);
return buffer.toString();
}
+
+ /**
+ * Locate a subtype by its id.
+ * @param goal the column id to look for
+ * @return the subtype
+ */
+ public TypeDescription findSubtype(int goal) {
+ // call getId method to make sure the ids are assigned
+ int id = getId();
+ if (goal < id || goal > maxId) {
+ throw new IllegalArgumentException("Unknown type id " + id + " in " +
+ toJson());
+ }
+ if (goal == id) {
+ return this;
+ } else {
+ TypeDescription prev = null;
+ for(TypeDescription next: children) {
+ if (next.id > goal) {
+ return prev.findSubtype(goal);
+ }
+ prev = next;
+ }
+ return prev.findSubtype(goal);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
index 36b9a20..20e0faa 100644
--- a/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java
@@ -1408,7 +1408,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
public void setConvertVectorElement(int elementNum) {
long longValue = longColVector.vector[elementNum];
String string = anyIntegerAsLongTreeReader.getString(longValue);
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
}
@@ -1450,7 +1450,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
float floatValue = (float) doubleColVector.vector[elementNum];
if (!Float.isNaN(floatValue)) {
String string = String.valueOf(floatValue);
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
} else {
bytesColVector.noNulls = false;
@@ -1495,7 +1495,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
double doubleValue = doubleColVector.vector[elementNum];
if (!Double.isNaN(doubleValue)) {
String string = String.valueOf(doubleValue);
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
} else {
bytesColVector.noNulls = false;
@@ -1544,7 +1544,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
@Override
public void setConvertVectorElement(int elementNum) {
String string = decimalColVector.vector[elementNum].getHiveDecimal().toString();
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
}
@@ -1584,7 +1584,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
public void setConvertVectorElement(int elementNum) throws IOException {
String string =
timestampColVector.asScratchTimestamp(elementNum).toString();
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
}
@@ -1626,7 +1626,7 @@ public class ConvertTreeReaderFactory extends TreeReaderFactory {
public void setConvertVectorElement(int elementNum) throws IOException {
date.setTime(DateWritable.daysToMillis((int) longColVector.vector[elementNum]));
String string = date.toString();
- byte[] bytes = string.getBytes();
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
assignStringGroupVectorEntry(bytesColVector, elementNum, readerType, bytes);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcIndex.java b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
index 50a15f2..edcb3ba 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcIndex.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
@@ -22,10 +22,14 @@ import org.apache.orc.OrcProto;
public final class OrcIndex {
OrcProto.RowIndex[] rowGroupIndex;
+ OrcProto.Stream.Kind[] bloomFilterKinds;
OrcProto.BloomFilterIndex[] bloomFilterIndex;
- public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+ public OrcIndex(OrcProto.RowIndex[] rgIndex,
+ OrcProto.Stream.Kind[] bloomFilterKinds,
+ OrcProto.BloomFilterIndex[] bfIndex) {
this.rowGroupIndex = rgIndex;
+ this.bloomFilterKinds = bloomFilterKinds;
this.bloomFilterIndex = bfIndex;
}
@@ -37,6 +41,10 @@ public final class OrcIndex {
return bloomFilterIndex;
}
+ public OrcProto.Stream.Kind[] getBloomFilterKinds() {
+ return bloomFilterKinds;
+ }
+
public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
this.rowGroupIndex = rowGroupIndex;
}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index e8ad54d..c7ce2bb 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -27,7 +27,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.orc.BloomFilterIO;
+import org.apache.orc.OrcFile;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
@@ -88,10 +90,13 @@ public class RecordReaderImpl implements RecordReader {
private final TreeReaderFactory.TreeReader reader;
private final OrcProto.RowIndex[] indexes;
private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+ private final OrcProto.Stream.Kind[] bloomFilterKind;
private final SargApplier sargApp;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
private final DataReader dataReader;
+ private final boolean ignoreNonUtf8BloomFilter;
+ private final OrcFile.WriterVersion writerVersion;
/**
* Given a list of column names, find the given column and return the index.
@@ -134,6 +139,7 @@ public class RecordReaderImpl implements RecordReader {
protected RecordReaderImpl(ReaderImpl fileReader,
Reader.Options options) throws IOException {
this.included = options.getInclude();
+ this.writerVersion = fileReader.getWriterVersion();
included[0] = true;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
@@ -162,11 +168,14 @@ public class RecordReaderImpl implements RecordReader {
this.types = fileReader.types;
this.bufferSize = fileReader.bufferSize;
this.rowIndexStride = fileReader.rowIndexStride;
+ this.ignoreNonUtf8BloomFilter =
+ OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
SearchArgument sarg = options.getSearchArgument();
if (sarg != null && rowIndexStride != 0) {
sargApp = new SargApplier(sarg, options.getColumnNames(),
rowIndexStride,
- included.length, evolution);
+ included.length, evolution,
+ writerVersion);
} else {
sargApp = null;
}
@@ -218,6 +227,7 @@ public class RecordReaderImpl implements RecordReader {
writerIncluded = evolution.getFileIncluded();
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+ bloomFilterKind = new OrcProto.Stream.Kind[types.size()];
advanceToNextRow(reader, 0L, true);
}
@@ -339,20 +349,23 @@ public class RecordReaderImpl implements RecordReader {
* that is referenced in the predicate.
* @param statsProto the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
- * @param bloomFilter
+ * @param bloomFilter the bloom filter
+ * @param writerVersion the version of software that wrote the file
+ * @param type what is the kind of this column
* @return the set of truth values that may be returned for the given
* predicate.
*/
static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
- PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
+ PredicateLeaf predicate,
+ OrcProto.Stream.Kind kind,
+ OrcProto.BloomFilter bloomFilter,
+ OrcFile.WriterVersion writerVersion,
+ TypeDescription.Category type) {
ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
Object minValue = getMin(cs);
Object maxValue = getMax(cs);
- BloomFilterIO bf = null;
- if (bloomFilter != null) {
- bf = new BloomFilterIO(bloomFilter);
- }
- return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
+ return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(),
+ BloomFilterIO.deserialize(kind, writerVersion, type, bloomFilter));
}
/**
@@ -365,14 +378,14 @@ public class RecordReaderImpl implements RecordReader {
*/
public static TruthValue evaluatePredicate(ColumnStatistics stats,
PredicateLeaf predicate,
- BloomFilterIO bloomFilter) {
+ BloomFilter bloomFilter) {
Object minValue = getMin(stats);
Object maxValue = getMax(stats);
return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
}
static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
- Object max, boolean hasNull, BloomFilterIO bloomFilter) {
+ Object max, boolean hasNull, BloomFilter bloomFilter) {
// if we didn't have any values, everything must have been null
if (min == null) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
@@ -421,7 +434,7 @@ public class RecordReaderImpl implements RecordReader {
}
private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
- TruthValue result, BloomFilterIO bloomFilter) {
+ TruthValue result, BloomFilter bloomFilter) {
// evaluate bloom filter only when
// 1) Bloom filter is available
// 2) Min/Max evaluation yield YES or MAYBE
@@ -531,7 +544,7 @@ public class RecordReaderImpl implements RecordReader {
}
private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
- final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
+ final Object predObj, BloomFilter bloomFilter, boolean hasNull) {
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
// null safe equals does not return *_NULL variant. So set hasNull to false
@@ -553,7 +566,7 @@ public class RecordReaderImpl implements RecordReader {
}
}
- private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
+ private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull) {
TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
if (predObj instanceof Long) {
@@ -708,6 +721,7 @@ public class RecordReaderImpl implements RecordReader {
public final static boolean[] READ_ALL_RGS = null;
public final static boolean[] READ_NO_RGS = new boolean[0];
+ private final OrcFile.WriterVersion writerVersion;
private final SearchArgument sarg;
private final List<PredicateLeaf> sargLeaves;
private final int[] filterColumns;
@@ -716,10 +730,13 @@ public class RecordReaderImpl implements RecordReader {
private final boolean[] sargColumns;
private SchemaEvolution evolution;
- public SargApplier(SearchArgument sarg, String[] columnNames,
+ public SargApplier(SearchArgument sarg,
+ String[] columnNames,
long rowIndexStride,
int includedCount,
- SchemaEvolution evolution) {
+ SchemaEvolution evolution,
+ OrcFile.WriterVersion writerVersion) {
+ this.writerVersion = writerVersion;
this.sarg = sarg;
sargLeaves = sarg.getLeaves();
filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves,
@@ -745,8 +762,11 @@ public class RecordReaderImpl implements RecordReader {
* row groups must be read.
* @throws IOException
*/
- public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
- OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
+ public boolean[] pickRowGroups(StripeInformation stripe,
+ OrcProto.RowIndex[] indexes,
+ OrcProto.Stream.Kind[] bloomFilterKinds,
+ OrcProto.BloomFilterIndex[] bloomFilterIndices,
+ boolean returnNone) throws IOException {
long rowsInStripe = stripe.getNumberOfRows();
int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
@@ -765,11 +785,15 @@ public class RecordReaderImpl implements RecordReader {
}
OrcProto.ColumnStatistics stats = entry.getStatistics();
OrcProto.BloomFilter bf = null;
+ OrcProto.Stream.Kind bfk = null;
if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
+ bfk = bloomFilterKinds[columnIx];
bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
}
if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
- leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+ leafValues[pred] = evaluatePredicateProto(stats,
+ sargLeaves.get(pred), bfk, bf, writerVersion,
+ evolution.getFileSchema().findSubtype(columnIx).getCategory());
} else {
leafValues[pred] = TruthValue.YES_NO_NULL;
}
@@ -809,7 +833,8 @@ public class RecordReaderImpl implements RecordReader {
return null;
}
readRowIndex(currentStripe, writerIncluded, sargApp.sargColumns);
- return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
+ return sargApp.pickRowGroups(stripes.get(currentStripe), indexes,
+ bloomFilterKind, bloomFilterIndices, false);
}
private void clearStreams() {
@@ -1168,8 +1193,9 @@ public class RecordReaderImpl implements RecordReader {
sargColumns = sargColumns == null ?
(sargApp == null ? null : sargApp.sargColumns) : sargColumns;
}
- return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
- bloomFilterIndex);
+ return dataReader.readRowIndex(stripe, evolution.getFileType(0), stripeFooter,
+ ignoreNonUtf8BloomFilter, included, indexes, sargColumns, writerVersion,
+ bloomFilterKind, bloomFilterIndex);
}
private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 3d57732..cadee35 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
+import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
/**
* Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
@@ -44,6 +44,100 @@ import org.apache.orc.StripeInformation;
public class RecordReaderUtils {
private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+ static boolean hadBadBloomFilters(TypeDescription.Category category,
+ OrcFile.WriterVersion version) {
+ switch(category) {
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return !version.includes(OrcFile.WriterVersion.HIVE_12055);
+ case DECIMAL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Plans the list of disk ranges that the given stripe needs to read the
+ * indexes. All of the positions are relative to the start of the stripe.
+ * @param fileSchema the schema for the file
+ * @param footer the stripe footer
+ * @param ignoreNonUtf8BloomFilter should the reader ignore non-utf8
+ * encoded bloom filters
+ * @param fileIncluded the columns (indexed by file columns) that should be
+ * read
+ * @param sargColumns true for the columns (indexed by file columns) that
+ * we need bloom filters for
+ * @param version the version of the software that wrote the file
+ * @param bloomFilterKinds (output) the stream kind of the bloom filters
+ * @return a list of merged disk ranges to read
+ */
+ static DiskRangeList planIndexReading(TypeDescription fileSchema,
+ OrcProto.StripeFooter footer,
+ boolean ignoreNonUtf8BloomFilter,
+ boolean[] fileIncluded,
+ boolean[] sargColumns,
+ OrcFile.WriterVersion version,
+ OrcProto.Stream.Kind[] bloomFilterKinds) {
+ DiskRangeList.CreateHelper result = new DiskRangeList.CreateHelper();
+ List<OrcProto.Stream> streams = footer.getStreamsList();
+ // figure out which kind of bloom filter we want for each column
+ // picks bloom_filter_utf8 if its available, otherwise bloom_filter
+ if (sargColumns != null) {
+ for (OrcProto.Stream stream : streams) {
+ if (stream.hasKind() && stream.hasColumn()) {
+ int column = stream.getColumn();
+ if (sargColumns[column]) {
+ switch (stream.getKind()) {
+ case BLOOM_FILTER:
+ if (bloomFilterKinds[column] == null &&
+ !(ignoreNonUtf8BloomFilter &&
+ hadBadBloomFilters(fileSchema.findSubtype(column)
+ .getCategory(), version))) {
+ bloomFilterKinds[column] = OrcProto.Stream.Kind.BLOOM_FILTER;
+ }
+ break;
+ case BLOOM_FILTER_UTF8:
+ bloomFilterKinds[column] = OrcProto.Stream.Kind.BLOOM_FILTER_UTF8;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+ }
+ long offset = 0;
+ for(OrcProto.Stream stream: footer.getStreamsList()) {
+ if (stream.hasKind() && stream.hasColumn()) {
+ int column = stream.getColumn();
+ if (fileIncluded == null || fileIncluded[column]) {
+ boolean needStream = false;
+ switch (stream.getKind()) {
+ case ROW_INDEX:
+ needStream = true;
+ break;
+ case BLOOM_FILTER:
+ needStream = bloomFilterKinds[column] == OrcProto.Stream.Kind.BLOOM_FILTER;
+ break;
+ case BLOOM_FILTER_UTF8:
+ needStream = bloomFilterKinds[column] == OrcProto.Stream.Kind.BLOOM_FILTER_UTF8;
+ break;
+ default:
+ // PASS
+ break;
+ }
+ if (needStream) {
+ result.addOrMerge(offset, offset + stream.getLength(), true, false);
+ }
+ }
+ }
+ offset += stream.getLength();
+ }
+ return result.get();
+ }
+
private static class DefaultDataReader implements DataReader {
private FSDataInputStream file = null;
private final ByteBufferAllocatorPool pool;
@@ -91,10 +185,14 @@ public class RecordReaderUtils {
@Override
public OrcIndex readRowIndex(StripeInformation stripe,
+ TypeDescription fileSchema,
OrcProto.StripeFooter footer,
+ boolean ignoreNonUtf8BloomFilter,
boolean[] included,
OrcProto.RowIndex[] indexes,
boolean[] sargColumns,
+ OrcFile.WriterVersion version,
+ OrcProto.Stream.Kind[] bloomFilterKinds,
OrcProto.BloomFilterIndex[] bloomFilterIndices
) throws IOException {
if (file == null) {
@@ -106,49 +204,61 @@ public class RecordReaderUtils {
if (indexes == null) {
indexes = new OrcProto.RowIndex[typeCount];
}
+ if (bloomFilterKinds == null) {
+ bloomFilterKinds = new OrcProto.Stream.Kind[typeCount];
+ }
if (bloomFilterIndices == null) {
bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
}
- long offset = stripe.getOffset();
- List<OrcProto.Stream> streams = footer.getStreamsList();
- for (int i = 0; i < streams.size(); i++) {
- OrcProto.Stream stream = streams.get(i);
- OrcProto.Stream nextStream = null;
- if (i < streams.size() - 1) {
- nextStream = streams.get(i+1);
+ DiskRangeList ranges = planIndexReading(fileSchema, footer,
+ ignoreNonUtf8BloomFilter, included, sargColumns, version,
+ bloomFilterKinds);
+ ranges = readDiskRanges(file, zcr, stripe.getOffset(), ranges, false);
+ long offset = 0;
+ DiskRangeList range = ranges;
+ for(OrcProto.Stream stream: footer.getStreamsList()) {
+ // advance to find the next range
+ while (range != null && range.getEnd() <= offset) {
+ range = range.next;
}
- int col = stream.getColumn();
- int len = (int) stream.getLength();
- // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
- // filter and combine the io to read row index and bloom filters for that column together
- if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
- boolean readBloomFilter = false;
- if (sargColumns != null && sargColumns[col] &&
- nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
- len += nextStream.getLength();
- i += 1;
- readBloomFilter = true;
- }
- if ((included == null || included[col]) && indexes[col] == null) {
- byte[] buffer = new byte[len];
- file.readFully(offset, buffer, 0, buffer.length);
- ByteBuffer bb = ByteBuffer.wrap(buffer);
- indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- ReaderImpl.singleton(new BufferChunk(bb, 0)), stream.getLength(),
- codec, bufferSize));
- if (readBloomFilter) {
- bb.position((int) stream.getLength());
- bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
- "bloom_filter", ReaderImpl.singleton(new BufferChunk(bb, 0)),
- nextStream.getLength(), codec, bufferSize));
- }
+ // no more ranges, so we are done
+ if (range == null) {
+ break;
+ }
+ int column = stream.getColumn();
+ if (stream.hasKind() && range.getOffset() <= offset) {
+ switch (stream.getKind()) {
+ case ROW_INDEX:
+ if (included == null || included[column]) {
+ ByteBuffer bb = range.getData().duplicate();
+ bb.position((int) (offset - range.getOffset()));
+ bb.limit((int) (bb.position() + stream.getLength()));
+ indexes[column] = OrcProto.RowIndex.parseFrom(
+ InStream.createCodedInputStream("index",
+ ReaderImpl.singleton(new BufferChunk(bb, 0)),
+ stream.getLength(),
+ codec, bufferSize));
+ }
+ break;
+ case BLOOM_FILTER:
+ case BLOOM_FILTER_UTF8:
+ if (sargColumns != null && sargColumns[column]) {
+ ByteBuffer bb = range.getData().duplicate();
+ bb.position((int) (offset - range.getOffset()));
+ bb.limit((int) (bb.position() + stream.getLength()));
+ bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
+ (InStream.createCodedInputStream("bloom_filter",
+ ReaderImpl.singleton(new BufferChunk(bb, 0)),
+ stream.getLength(), codec, bufferSize));
+ }
+ break;
+ default:
+ break;
}
}
- offset += len;
+ offset += stream.getLength();
}
-
- OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
- return index;
+ return new OrcIndex(indexes, bloomFilterKinds, bloomFilterIndices);
}
@Override
@@ -234,14 +344,14 @@ public class RecordReaderUtils {
}
public static void addEntireStreamToRanges(
- long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+ long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
list.addOrMerge(offset, offset + length, doMergeBuffers, false);
}
public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
- long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+ long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
for (int group = 0; group < includedRowGroups.length; ++group) {
if (!includedRowGroups[group]) continue;
int posn = getIndexPosition(
@@ -399,7 +509,7 @@ public class RecordReaderUtils {
if (range == null) return null;
DiskRangeList prev = range.prev;
if (prev == null) {
- prev = new MutateHelper(range);
+ prev = new DiskRangeList.MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index 1e11728..20adfd8 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -153,6 +153,10 @@ public class SchemaEvolution {
return hasConversion;
}
+ public TypeDescription getFileSchema() {
+ return fileSchema;
+ }
+
public TypeDescription getFileType(TypeDescription readerType) {
return getFileType(readerType.getId());
}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/StreamName.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/StreamName.java b/java/core/src/java/org/apache/orc/impl/StreamName.java
index b3fd145..e3561bf 100644
--- a/java/core/src/java/org/apache/orc/impl/StreamName.java
+++ b/java/core/src/java/org/apache/orc/impl/StreamName.java
@@ -78,6 +78,7 @@ public class StreamName implements Comparable<StreamName> {
case ROW_INDEX:
case DICTIONARY_COUNT:
case BLOOM_FILTER:
+ case BLOOM_FILTER_UTF8:
return Area.INDEX;
default:
return Area.DATA;
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 3df1b76..940ef59 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -21,6 +21,7 @@ package org.apache.orc.impl;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,12 +35,10 @@ import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
-import io.airlift.compress.snappy.SnappyCompressor;
-import io.airlift.compress.snappy.SnappyDecompressor;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.BloomFilterIO;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
@@ -50,6 +49,7 @@ import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.util.BloomFilterUtf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -147,6 +147,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final OrcFile.CompressionStrategy compressionStrategy;
private final boolean[] bloomFilterColumns;
private final double bloomFilterFpp;
+ private final OrcFile.BloomFilterVersion bloomFilterVersion;
private boolean writeTimeZone;
public WriterImpl(FileSystem fs,
@@ -157,6 +158,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.conf = opts.getConfiguration();
this.callback = opts.getCallback();
this.schema = opts.getSchema();
+ bloomFilterVersion = opts.getBloomFilterVersion();
if (callback != null) {
callbackContext = new OrcFile.WriterContext(){
@@ -426,6 +428,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
case BLOOM_FILTER:
case DATA:
case DICTIONARY_DATA:
+ case BLOOM_FILTER_UTF8:
if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
CompressionCodec.Modifier.TEXT);
@@ -543,6 +546,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public boolean hasWriterTimeZone() {
return writeTimeZone;
}
+
+ public OrcFile.BloomFilterVersion getBloomFilterVersion() {
+ return bloomFilterVersion;
+ }
}
/**
@@ -564,9 +571,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
private final PositionedOutputStream rowIndexStream;
private final PositionedOutputStream bloomFilterStream;
- protected final BloomFilterIO bloomFilter;
+ private final PositionedOutputStream bloomFilterStreamUtf8;
+ protected final BloomFilter bloomFilter;
+ protected final BloomFilterUtf8 bloomFilterUtf8;
protected final boolean createBloomFilter;
private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
private final OrcProto.BloomFilter.Builder bloomFilterEntry;
private boolean foundNulls;
private OutStream isPresentOutStream;
@@ -612,15 +622,30 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
if (createBloomFilter) {
bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
- bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
- bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+ if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
+ bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+ bloomFilterStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER);;
+ } else {
+ bloomFilter = null;
+ bloomFilterIndex = null;
+ bloomFilterStream = null;
+ }
+ bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
streamFactory.getBloomFilterFPP());
+ bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
+ bloomFilterStreamUtf8 = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER_UTF8);;
} else {
bloomFilterEntry = null;
bloomFilterIndex = null;
+ bloomFilterIndexUtf8 = null;
+ bloomFilterStreamUtf8 = null;
bloomFilterStream = null;
bloomFilter = null;
+ bloomFilterUtf8 = null;
}
}
@@ -788,7 +813,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
bloomFilterIndex.build().writeTo(bloomFilterStream);
bloomFilterStream.flush();
bloomFilterIndex.clear();
- bloomFilterEntry.clear();
+ }
+ // write the bloom filter to out stream
+ if (bloomFilterStreamUtf8 != null) {
+ bloomFilterIndexUtf8.build().writeTo(bloomFilterStreamUtf8);
+ bloomFilterStreamUtf8.flush();
+ bloomFilterIndexUtf8.clear();
}
}
@@ -837,12 +867,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
void addBloomFilterEntry() {
if (createBloomFilter) {
- bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
- bloomFilterEntry.addAllBitset(Arrays.asList(ArrayUtils.toObject(
- bloomFilter.getBitSet())));
- bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
- bloomFilter.reset();
- bloomFilterEntry.clear();
+ if (bloomFilter != null) {
+ BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
+ bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+ bloomFilter.reset();
+ }
+ if (bloomFilterUtf8 != null) {
+ BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
+ bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
+ bloomFilterUtf8.reset();
+ }
}
}
@@ -946,7 +980,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
byte value = (byte) vec.vector[0];
indexStatistics.updateInteger(value, length);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
for(int i=0; i < length; ++i) {
writer.write(value);
@@ -959,7 +996,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
writer.write(value);
indexStatistics.updateInteger(value, 1);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
}
}
@@ -1017,7 +1057,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
long value = vec.vector[0];
indexStatistics.updateInteger(value, length);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
for(int i=0; i < length; ++i) {
writer.write(value);
@@ -1030,7 +1073,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
writer.write(value);
indexStatistics.updateInteger(value, 1);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
}
}
@@ -1077,7 +1123,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
float value = (float) vec.vector[0];
indexStatistics.updateDouble(value);
if (createBloomFilter) {
- bloomFilter.addDouble(value);
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
}
for(int i=0; i < length; ++i) {
utils.writeFloat(stream, value);
@@ -1090,7 +1139,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
utils.writeFloat(stream, value);
indexStatistics.updateDouble(value);
if (createBloomFilter) {
- bloomFilter.addDouble(value);
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
}
}
}
@@ -1138,7 +1190,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
double value = vec.vector[0];
indexStatistics.updateDouble(value);
if (createBloomFilter) {
- bloomFilter.addDouble(value);
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
}
for(int i=0; i < length; ++i) {
utils.writeDouble(stream, value);
@@ -1151,7 +1206,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
utils.writeDouble(stream, value);
indexStatistics.updateDouble(value);
if (createBloomFilter) {
- bloomFilter.addDouble(value);
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
}
}
}
@@ -1430,7 +1488,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateString(vec.vector[0], vec.start[0],
vec.length[0], length);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+ vec.length[0], StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
}
}
} else {
@@ -1447,7 +1510,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateString(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i], 1);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i],
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i]);
}
}
@@ -1504,7 +1573,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+ vec.length[0], StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
}
}
} else {
@@ -1531,7 +1605,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i],
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
}
}
}
@@ -1576,7 +1657,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateString(vec.vector[0], vec.start[0],
itemLength, length);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0],
+ vec.start[0], itemLength,
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0],
+ vec.start[0], itemLength);
}
}
} else {
@@ -1594,7 +1682,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateString(vec.vector[offset + i],
vec.start[offset + i], itemLength, 1);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], itemLength,
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
vec.start[offset + i], itemLength);
}
}
@@ -1646,7 +1740,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateBinary(vec.vector[0], vec.start[0],
vec.length[0], length);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ if (bloomFilter != null) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
}
}
} else {
@@ -1658,7 +1755,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics.updateBinary(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i], 1);
if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
+ if (bloomFilter != null) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
vec.start[offset + i], vec.length[offset + i]);
}
}
@@ -1734,7 +1835,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
long millis = val.getTime();
indexStatistics.updateTimestamp(millis);
if (createBloomFilter) {
- bloomFilter.addLong(millis);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(millis);
+ }
+ bloomFilterUtf8.addLong(millis);
}
final long secs = millis / MILLIS_PER_SECOND - base_timestamp;
final long nano = formatNanos(val.getNanos());
@@ -1753,7 +1857,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
nanos.write(formatNanos(val.getNanos()));
indexStatistics.updateTimestamp(millis);
if (createBloomFilter) {
- bloomFilter.addLong(millis);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(millis);
+ }
+ bloomFilterUtf8.addLong(millis);
}
}
}
@@ -1819,7 +1926,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int value = (int) vec.vector[0];
indexStatistics.updateDate(value);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
for(int i=0; i < length; ++i) {
writer.write(value);
@@ -1832,7 +1942,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
writer.write(value);
indexStatistics.updateDate(value);
if (createBloomFilter) {
- bloomFilter.addLong(value);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
}
}
}
@@ -1901,7 +2014,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
HiveDecimal value = vec.vector[0].getHiveDecimal();
indexStatistics.updateDecimal(value);
if (createBloomFilter) {
- bloomFilter.addString(value.toString());
+ String str = value.toString();
+ if (bloomFilter != null) {
+ bloomFilter.addString(str);
+ }
+ bloomFilterUtf8.addString(str);
}
for(int i=0; i < length; ++i) {
SerializationUtils.writeBigInteger(valueStream,
@@ -1918,7 +2035,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
scaleStream.write(value.scale());
indexStatistics.updateDecimal(value);
if (createBloomFilter) {
- bloomFilter.addString(value.toString());
+ String str = value.toString();
+ if (bloomFilter != null) {
+ bloomFilter.addString(str);
+ }
+ bloomFilterUtf8.addString(str);
}
}
}
@@ -2065,7 +2186,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
}
if (createBloomFilter) {
- bloomFilter.addLong(childLength);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(childLength);
+ }
+ bloomFilterUtf8.addLong(childLength);
}
}
} else {
@@ -2088,6 +2212,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
currentLength += nextLength;
}
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(nextLength);
+ }
+ bloomFilterUtf8.addLong(nextLength);
+ }
}
}
if (currentLength != 0) {
@@ -2161,7 +2291,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
}
if (createBloomFilter) {
- bloomFilter.addLong(childLength);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(childLength);
+ }
+ bloomFilterUtf8.addLong(childLength);
}
}
} else {
@@ -2186,6 +2319,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
currentLength += nextLength;
}
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(nextLength);
+ }
+ bloomFilterUtf8.addLong(nextLength);
+ }
}
}
if (currentLength != 0) {
@@ -2247,7 +2386,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
tags.write(tag);
}
if (createBloomFilter) {
- bloomFilter.addLong(tag);
+ if (bloomFilter != null) {
+ bloomFilter.addLong(tag);
+ }
+ bloomFilterUtf8.addLong(tag);
}
childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
}
@@ -2275,6 +2417,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
currentStart[tag] = i + offset;
currentLength[tag] = 1;
}
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(tag);
+ }
+ bloomFilterUtf8.addLong(tag);
+ }
}
}
// write out any left over sequences
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilter.java b/java/core/src/java/org/apache/orc/util/BloomFilter.java
new file mode 100644
index 0000000..0d9db24
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilter.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable (default: 5%) depending on which
+ * storage requirement may increase or decrease. Lower the false positive probability greater
+ * is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ *
+ * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash
+ * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash
+ * collisions for specific sequence of repeating bytes. Check the following link for more info
+ * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+ *
+ * Note that this class is here for backwards compatibility, because it uses
+ * the JVM default character set for strings. All new users should
+ * BloomFilterUtf8, which always uses UTF8 for the encoding.
+ */
+public class BloomFilter {
+ public static final double DEFAULT_FPP = 0.05;
+ private final BitSet bitSet;
+ private final int numBits;
+ private final int numHashFunctions;
+
+ static void checkArgument(boolean expression, String message) {
+ if (!expression) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ public BloomFilter(long expectedEntries) {
+ this(expectedEntries, DEFAULT_FPP);
+ }
+
+ public BloomFilter(long expectedEntries, double fpp) {
+ checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+ checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should be > 0.0 & < 1.0");
+ int nb = optimalNumOfBits(expectedEntries, fpp);
+ // make 'm' multiple of 64
+ this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+ this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, numBits);
+ this.bitSet = new BitSet(numBits);
+ }
+
+ /**
+ * A constructor to support rebuilding the BloomFilter from a serialized representation.
+ * @param bits the serialized bits
+ * @param numFuncs the number of functions used
+ */
+ public BloomFilter(long[] bits, int numFuncs) {
+ super();
+ bitSet = new BitSet(bits);
+ this.numBits = (int) bitSet.bitSize();
+ numHashFunctions = numFuncs;
+ }
+
+ static int optimalNumOfHashFunctions(long n, long m) {
+ return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+ }
+
+ static int optimalNumOfBits(long n, double p) {
+ return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null &&
+ other.getClass() == getClass() &&
+ numBits == ((BloomFilter) other).numBits &&
+ numHashFunctions == ((BloomFilter) other).numHashFunctions &&
+ bitSet.equals(((BloomFilter) other).bitSet);
+ }
+
+ public void add(byte[] val) {
+ if (val == null) {
+ addBytes(val, -1, -1);
+ } else {
+ addBytes(val, 0, val.length);
+ }
+ }
+
+ public void addBytes(byte[] val, int offset, int length) {
+ // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
+ // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
+ // implement a Bloom filter without any loss in the asymptotic false positive probability'
+
+ // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
+ // in the above paper
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
+ addHash(hash64);
+ }
+
+ private void addHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ bitSet.set(pos);
+ }
+ }
+
+ public void addString(String val) {
+ if (val == null) {
+ add(null);
+ } else {
+ add(val.getBytes(Charset.defaultCharset()));
+ }
+ }
+
+ public void addLong(long val) {
+ addHash(getLongHash(val));
+ }
+
+ public void addDouble(double val) {
+ addLong(Double.doubleToLongBits(val));
+ }
+
+ public boolean test(byte[] val) {
+ if (val == null) {
+ return testBytes(val, -1, -1);
+ }
+ return testBytes(val, 0, val.length);
+ }
+
+ public boolean testBytes(byte[] val, int offset, int length) {
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
+ return testHash(hash64);
+ }
+
+ private boolean testHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ if (!bitSet.get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean testString(String val) {
+ if (val == null) {
+ return test(null);
+ } else {
+ return test(val.getBytes(Charset.defaultCharset()));
+ }
+ }
+
+ public boolean testLong(long val) {
+ return testHash(getLongHash(val));
+ }
+
+ // Thomas Wang's integer hash function
+ // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+ private long getLongHash(long key) {
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key = key ^ (key >> 24);
+ key = (key + (key << 3)) + (key << 8); // key * 265
+ key = key ^ (key >> 14);
+ key = (key + (key << 2)) + (key << 4); // key * 21
+ key = key ^ (key >> 28);
+ key = key + (key << 31);
+ return key;
+ }
+
+ public boolean testDouble(double val) {
+ return testLong(Double.doubleToLongBits(val));
+ }
+
+ public long sizeInBytes() {
+ return getBitSize() / 8;
+ }
+
+ public int getBitSize() {
+ return bitSet.getData().length * Long.SIZE;
+ }
+
+ public int getNumHashFunctions() {
+ return numHashFunctions;
+ }
+
+ public long[] getBitSet() {
+ return bitSet.getData();
+ }
+
+ @Override
+ public String toString() {
+ return "m: " + numBits + " k: " + numHashFunctions;
+ }
+
+ /**
+ * Merge the specified bloom filter with current bloom filter.
+ *
+ * @param that - bloom filter to merge
+ */
+ public void merge(BloomFilter that) {
+ if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
+ this.bitSet.putAll(that.bitSet);
+ } else {
+ throw new IllegalArgumentException("BloomFilters are not compatible for merging." +
+ " this - " + this.toString() + " that - " + that.toString());
+ }
+ }
+
+ public void reset() {
+ this.bitSet.clear();
+ }
+
+ /**
+ * Bare metal bit set implementation. For performance reasons, this implementation does not check
+ * for index bounds nor expand the bit set size if the specified index is greater than the size.
+ */
+ public static class BitSet {
+ private final long[] data;
+
+ public BitSet(long bits) {
+ this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
+ }
+
+ /**
+ * Deserialize long array as bit set.
+ *
+ * @param data - bit array
+ */
+ public BitSet(long[] data) {
+ assert data.length > 0 : "data length is zero!";
+ this.data = data;
+ }
+
+ /**
+ * Sets the bit at specified index.
+ *
+ * @param index - position
+ */
+ public void set(int index) {
+ data[index >>> 6] |= (1L << index);
+ }
+
+ /**
+ * Returns true if the bit is set in the specified index.
+ *
+ * @param index - position
+ * @return - value at the bit position
+ */
+ public boolean get(int index) {
+ return (data[index >>> 6] & (1L << index)) != 0;
+ }
+
+ /**
+ * Number of bits
+ */
+ public long bitSize() {
+ return (long) data.length * Long.SIZE;
+ }
+
+ public long[] getData() {
+ return data;
+ }
+
+ /**
+ * Combines the two BitArrays using bitwise OR.
+ */
+ public void putAll(BitSet array) {
+ assert data.length == array.data.length :
+ "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
+ for (int i = 0; i < data.length; i++) {
+ data[i] |= array.data[i];
+ }
+ }
+
+ /**
+ * Clear the bit set.
+ */
+ public void clear() {
+ Arrays.fill(data, 0);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null &&
+ other.getClass() == getClass() &&
+ Arrays.equals(data, ((BitSet) other).data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilterIO.java b/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
new file mode 100644
index 0000000..a6c3940
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilterIO.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import com.google.protobuf.ByteString;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BloomFilterIO {
+
+ private BloomFilterIO() {
+ // never called
+ }
+
+ /**
+ * Deserialize a bloom filter from the ORC file.
+ */
+ public static BloomFilter deserialize(OrcProto.Stream.Kind kind,
+ OrcFile.WriterVersion fileVersion,
+ TypeDescription.Category type,
+ OrcProto.BloomFilter bloomFilter) {
+ if (bloomFilter == null) {
+ return null;
+ }
+ int numFuncs = bloomFilter.getNumHashFunctions();
+ switch (kind) {
+ case BLOOM_FILTER: {
+ long values[] = new long[bloomFilter.getBitsetCount()];
+ for (int i = 0; i < values.length; ++i) {
+ values[i] = bloomFilter.getBitset(i);
+ }
+ // After HIVE-12055 the bloom filters for strings correctly use
+ // UTF8.
+ if (fileVersion.includes(OrcFile.WriterVersion.HIVE_12055) &&
+ (type == TypeDescription.Category.STRING ||
+ type == TypeDescription.Category.CHAR ||
+ type == TypeDescription.Category.VARCHAR)) {
+ return new BloomFilterUtf8(values, numFuncs);
+ }
+ return new BloomFilter(values, numFuncs);
+ }
+ case BLOOM_FILTER_UTF8: {
+ ByteString bits = bloomFilter.getUtf8Bitset();
+ long[] values = new long[bits.size() / 8];
+ bits.asReadOnlyByteBuffer().order(ByteOrder.LITTLE_ENDIAN)
+ .asLongBuffer().get(values);
+ return new BloomFilterUtf8(values, numFuncs);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown bloom filter kind " + kind);
+ }
+ }
+
+ /**
+ * Serialize the BloomFilter to the ORC file.
+ * @param builder the builder to write to
+ * @param bloomFilter the bloom filter to serialize
+ */
+ public static void serialize(OrcProto.BloomFilter.Builder builder,
+ BloomFilter bloomFilter) {
+ builder.clear();
+ builder.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+ long[] bitset = bloomFilter.getBitSet();
+ if (bloomFilter instanceof BloomFilterUtf8) {
+ ByteBuffer buffer = ByteBuffer.allocate(bitset.length * 8);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ buffer.asLongBuffer().put(bitset);
+ builder.setUtf8Bitset(ByteString.copyFrom(buffer));
+ } else {
+ for(int i=0; i < bitset.length; ++i) {
+ builder.addBitset(bitset[i]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java b/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
new file mode 100644
index 0000000..aad4fab
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/util/BloomFilterUtf8.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.util;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This class represents the fix from ORC-101 where we fixed the bloom filter
+ * from using the JVM's default character set to always using UTF-8.
+ */
+public class BloomFilterUtf8 extends BloomFilter {
+
+ public BloomFilterUtf8(long expectedEntries, double fpp) {
+ super(expectedEntries, fpp);
+ }
+
+ public BloomFilterUtf8(long[] bits, int numFuncs) {
+ super(bits, numFuncs);
+ }
+
+
+ public void addString(String val) {
+ if (val == null) {
+ add(null);
+ } else {
+ add(val.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ public boolean testString(String val) {
+ if (val == null) {
+ return test(null);
+ } else {
+ return test(val.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/833cc0e7/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index af20d1f..5ef0ced 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -1904,8 +1904,8 @@ public class TestVectorOrcFile {
.withZeroCopy(false)
.build());
OrcIndex index =
- meta.readRowIndex(reader.getStripes().get(0), null, null, null, null,
- null);
+ meta.readRowIndex(reader.getStripes().get(0), null, null, false, null, null,
+ null, OrcFile.WriterVersion.ORC_101, null, null);
// check the primitive columns to make sure they have the right number of
// items in the first row group
for(int c=1; c < 9; ++c) {