You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 23:39:16 UTC
[2/4] Add a unified and optionally more constrained API for
expressing filters on columns
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java
index 66fe6a0..0107b36 100644
--- a/parquet-column/src/test/java/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java
@@ -15,19 +15,6 @@
*/
package parquet.io;
-import static org.junit.Assert.assertEquals;
-import static parquet.example.Paper.r1;
-import static parquet.example.Paper.r2;
-import static parquet.example.Paper.schema;
-import static parquet.filter.AndRecordFilter.and;
-import static parquet.filter.ColumnPredicates.applyFunctionToLong;
-import static parquet.filter.ColumnPredicates.applyFunctionToString;
-import static parquet.filter.ColumnPredicates.equalTo;
-import static parquet.filter.ColumnRecordFilter.column;
-import static parquet.filter.NotRecordFilter.not;
-import static parquet.filter.OrRecordFilter.or;
-import static parquet.filter.PagedRecordFilter.page;
-
import java.util.ArrayList;
import java.util.List;
@@ -41,8 +28,22 @@ import parquet.example.data.GroupWriter;
import parquet.example.data.simple.convert.GroupRecordConverter;
import parquet.filter.ColumnPredicates.LongPredicateFunction;
import parquet.filter.ColumnPredicates.PredicateFunction;
+import parquet.filter2.compat.FilterCompat;
import parquet.io.api.RecordMaterializer;
+import static org.junit.Assert.assertEquals;
+import static parquet.example.Paper.r1;
+import static parquet.example.Paper.r2;
+import static parquet.example.Paper.schema;
+import static parquet.filter.AndRecordFilter.and;
+import static parquet.filter.ColumnPredicates.applyFunctionToLong;
+import static parquet.filter.ColumnPredicates.applyFunctionToString;
+import static parquet.filter.ColumnPredicates.equalTo;
+import static parquet.filter.ColumnRecordFilter.column;
+import static parquet.filter.NotRecordFilter.not;
+import static parquet.filter.OrRecordFilter.or;
+import static parquet.filter.PagedRecordFilter.page;
+
public class TestFiltered {
/* Class that implements applyFunction filter for long. Checks for long greater than 15. */
@@ -84,15 +85,14 @@ public class TestFiltered {
// Get first record
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
- columnIO.getRecordReader(memPageStore, recordConverter,
- column("DocId", equalTo(10l)));
+ columnIO.getRecordReader(memPageStore, recordConverter, FilterCompat.get(column("DocId", equalTo(10l))));
readOne(recordReader, "r2 filtered out", r1);
// Get second record
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("DocId", equalTo(20l)));
+ FilterCompat.get(column("DocId", equalTo(20l))));
readOne(recordReader, "r1 filtered out", r2);
@@ -107,14 +107,14 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("DocId", equalTo(10l)));
+ FilterCompat.get(column("DocId", equalTo(10l))));
readOne(recordReader, "r2 filtered out", r1);
// Get second record
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("DocId", applyFunctionToLong (new LongGreaterThan15Predicate())));
+ FilterCompat.get(column("DocId", applyFunctionToLong(new LongGreaterThan15Predicate()))));
readOne(recordReader, "r1 filtered out", r2);
}
@@ -128,7 +128,7 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", equalTo("http://A")));
+ FilterCompat.get(column("Name.Url", equalTo("http://A"))));
readOne(recordReader, "r2 filtered out", r1);
@@ -136,7 +136,7 @@ public class TestFiltered {
// against the first instance of a
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", equalTo("http://B")));
+ FilterCompat.get(column("Name.Url", equalTo("http://B"))));
List<Group> all = readAll(recordReader);
assertEquals("There should be no matching records: " + all , 0, all.size());
@@ -144,7 +144,7 @@ public class TestFiltered {
// Finally try matching against the C url in record 2
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", equalTo("http://C")));
+ FilterCompat.get(column("Name.Url", equalTo("http://C"))));
readOne(recordReader, "r1 filtered out", r2);
@@ -159,7 +159,7 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", applyFunctionToString (new StringEndsWithAPredicate ())));
+ FilterCompat.get(column("Name.Url", applyFunctionToString(new StringEndsWithAPredicate()))));
readOne(recordReader, "r2 filtered out", r1);
@@ -167,7 +167,7 @@ public class TestFiltered {
// against the first instance of a
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", equalTo("http://B")));
+ FilterCompat.get(column("Name.Url", equalTo("http://B"))));
List<Group> all = readAll(recordReader);
assertEquals("There should be no matching records: " + all , 0, all.size());
@@ -175,7 +175,7 @@ public class TestFiltered {
// Finally try matching against the C url in record 2
recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- column("Name.Url", equalTo("http://C")));
+ FilterCompat.get(column("Name.Url", equalTo("http://C"))));
readOne(recordReader, "r1 filtered out", r2);
@@ -189,7 +189,7 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- page(4, 4));
+ FilterCompat.get(page(4, 4)));
List<Group> all = readAll(recordReader);
assertEquals("expecting records " + all, 4, all.size());
@@ -206,7 +206,7 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- and(column("DocId", equalTo(10l)), page(2, 4)));
+ FilterCompat.get(and(column("DocId", equalTo(10l)), page(2, 4))));
List<Group> all = readAll(recordReader);
assertEquals("expecting 4 records " + all, 4, all.size());
@@ -224,8 +224,8 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- or(column("DocId", equalTo(10l)),
- column("DocId", equalTo(20l))));
+ FilterCompat.get(or(column("DocId", equalTo(10l)),
+ column("DocId", equalTo(20l)))));
List<Group> all = readAll(recordReader);
assertEquals("expecting 8 records " + all, 16, all.size());
@@ -243,7 +243,7 @@ public class TestFiltered {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
columnIO.getRecordReader(memPageStore, recordConverter,
- not(column("DocId", equalTo(10l))));
+ FilterCompat.get(not(column("DocId", equalTo(10l)))));
List<Group> all = readAll(recordReader);
assertEquals("expecting 8 records " + all, 8, all.size());
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index 9abc8af..02abcad 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -3,7 +3,7 @@
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/Closeables.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/Closeables.java b/parquet-common/src/main/java/parquet/Closeables.java
new file mode 100644
index 0000000..9d4c213
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/Closeables.java
@@ -0,0 +1,37 @@
+package parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Utility for working with {@link java.io.Closeable}ss
+ */
+public final class Closeables {
+ private Closeables() { }
+
+ private static final Log LOG = Log.getLog(Closeables.class);
+
+ /**
+ * Closes a (potentially null) closeable.
+ * @param c can be null
+ * @throws IOException if c.close() throws an IOException.
+ */
+ public static void close(Closeable c) throws IOException {
+ if (c == null) { return; }
+ c.close();
+ }
+
+ /**
+ * Closes a (potentially null) closeable, swallowing any IOExceptions thrown by
+ * c.close(). The exception will be logged.
+ * @param c can be null
+ */
+ public static void closeAndSwallowIOExceptions(Closeable c) {
+ if (c == null) { return; }
+ try {
+ c.close();
+ } catch (IOException e) {
+ LOG.warn("Encountered exception closing closeable", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java b/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
new file mode 100644
index 0000000..3cea532
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.common.internal;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * returns canonical representation of objects (similar to String.intern()) to save memory
+ * if a.equals(b) then canonicalize(a) == canonicalize(b)
+ * this class is thread safe
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class Canonicalizer<T> {
+
+ private ConcurrentHashMap<T, T> canonicals = new ConcurrentHashMap<T, T>();
+
+ /**
+ * @param value the value to canonicalize
+ * @return the corresponding canonical value
+ */
+ final public T canonicalize(T value) {
+ T canonical = canonicals.get(value);
+ if (canonical == null) {
+ value = toCanonical(value);
+ T existing = canonicals.putIfAbsent(value, value);
+ // putIfAbsent is atomic, making sure we always return the same canonical representation of the value
+ if (existing == null) {
+ canonical = value;
+ } else {
+ canonical = existing;
+ }
+ }
+ return canonical;
+ }
+
+ /**
+ * @param value the value to canonicalize if needed
+ * @return the canonicalized value
+ */
+ protected T toCanonical(T value) {
+ return value;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java b/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
new file mode 100644
index 0000000..f3ded9c
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
@@ -0,0 +1,96 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.common.schema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import parquet.common.internal.Canonicalizer;
+
+import static parquet.Preconditions.checkNotNull;
+
+public final class ColumnPath implements Iterable<String>, Serializable {
+
+ private static Canonicalizer<ColumnPath> paths = new Canonicalizer<ColumnPath>() {
+ @Override
+ protected ColumnPath toCanonical(ColumnPath value) {
+ String[] path = new String[value.p.length];
+ for (int i = 0; i < value.p.length; i++) {
+ path[i] = value.p[i].intern();
+ }
+ return new ColumnPath(path);
+ }
+ };
+
+ public static ColumnPath fromDotString(String path) {
+ checkNotNull(path, "path");
+ return get(path.split("\\."));
+ }
+
+ public static ColumnPath get(String... path){
+ return paths.canonicalize(new ColumnPath(path));
+ }
+
+ private final String[] p;
+
+ private ColumnPath(String[] path) {
+ this.p = path;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ColumnPath) {
+ return Arrays.equals(p, ((ColumnPath)obj).p);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(p);
+ }
+
+ public String toDotString() {
+ Iterator<String> iter = Arrays.asList(p).iterator();
+ StringBuilder sb = new StringBuilder();
+ while (iter.hasNext()) {
+ sb.append(iter.next());
+ if (iter.hasNext()) {
+ sb.append('.');
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(p);
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return Arrays.asList(p).iterator();
+ }
+
+ public int size() {
+ return p.length;
+ }
+
+ public String[] toArray() {
+ return p;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-encoding/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml
index 6da2ea6..5840ca5 100644
--- a/parquet-encoding/pom.xml
+++ b/parquet-encoding/pom.xml
@@ -3,7 +3,7 @@
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml
index 7afc41b..1ec4a50 100644
--- a/parquet-generator/pom.xml
+++ b/parquet-generator/pom.xml
@@ -3,7 +3,7 @@
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/encoding/Generator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/encoding/Generator.java b/parquet-generator/src/main/java/parquet/encoding/Generator.java
index 58d9382..b6aa596 100644
--- a/parquet-generator/src/main/java/parquet/encoding/Generator.java
+++ b/parquet-generator/src/main/java/parquet/encoding/Generator.java
@@ -19,7 +19,7 @@ import parquet.encoding.bitpacking.ByteBasedBitPackingGenerator;
import parquet.encoding.bitpacking.IntBasedBitPackingGenerator;
/**
- * main class for code generation hook in build
+ * main class for code generation hook in build for encodings generation
*
* @author Julien Le Dem
*
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/filter2/Generator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/filter2/Generator.java b/parquet-generator/src/main/java/parquet/filter2/Generator.java
new file mode 100644
index 0000000..9818218
--- /dev/null
+++ b/parquet-generator/src/main/java/parquet/filter2/Generator.java
@@ -0,0 +1,10 @@
+package parquet.filter2;
+
+/**
+ * main class for code generation hook in build for filter2 generation
+ */
+public class Generator {
+ public static void main(String[] args) throws Exception {
+ IncrementallyUpdatedFilterPredicateGenerator.main(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
new file mode 100644
index 0000000..e0f08e4
--- /dev/null
+++ b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
@@ -0,0 +1,251 @@
+package parquet.filter2;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class IncrementallyUpdatedFilterPredicateGenerator {
+
+ public static void main(String[] args) throws IOException {
+ File srcFile = new File(args[0] + "/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilder.java");
+ srcFile = srcFile.getAbsoluteFile();
+ File parent = srcFile.getParentFile();
+ if (!parent.exists()) {
+ if (!parent.mkdirs()) {
+ throw new IOException("Couldn't mkdirs for " + parent);
+ }
+ }
+ new IncrementallyUpdatedFilterPredicateGenerator(srcFile).run();
+ }
+
+ private final FileWriter writer;
+
+ public IncrementallyUpdatedFilterPredicateGenerator(File file) throws IOException {
+ this.writer = new FileWriter(file);
+ }
+
+ private static class TypeInfo {
+ public final String className;
+ public final String primitiveName;
+ public final boolean useComparable;
+ public final boolean supportsInequality;
+
+ private TypeInfo(String className, String primitiveName, boolean useComparable, boolean supportsInequality) {
+ this.className = className;
+ this.primitiveName = primitiveName;
+ this.useComparable = useComparable;
+ this.supportsInequality = supportsInequality;
+ }
+ }
+
+ private static final TypeInfo[] TYPES = new TypeInfo[]{
+ new TypeInfo("Integer", "int", false, true),
+ new TypeInfo("Long", "long", false, true),
+ new TypeInfo("Boolean", "boolean", false, false),
+ new TypeInfo("Float", "float", false, true),
+ new TypeInfo("Double", "double", false, true),
+ new TypeInfo("Binary", "Binary", true, true),
+ };
+
+ public void run() throws IOException {
+ add("package parquet.filter2.recordlevel;\n" +
+ "\n" +
+ "import parquet.common.schema.ColumnPath;\n" +
+ "import parquet.filter2.predicate.Operators.Eq;\n" +
+ "import parquet.filter2.predicate.Operators.Gt;\n" +
+ "import parquet.filter2.predicate.Operators.GtEq;\n" +
+ "import parquet.filter2.predicate.Operators.LogicalNotUserDefined;\n" +
+ "import parquet.filter2.predicate.Operators.Lt;\n" +
+ "import parquet.filter2.predicate.Operators.LtEq;\n" +
+ "import parquet.filter2.predicate.Operators.NotEq;\n" +
+ "import parquet.filter2.predicate.Operators.UserDefined;\n" +
+ "import parquet.filter2.predicate.UserDefinedPredicate;\n" +
+ "import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n" +
+ "import parquet.io.api.Binary;\n\n" +
+ "/**\n" +
+ " * This class is auto-generated by {@link parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}\n" +
+ " * Do not manually edit!\n" +
+ " * See {@link IncrementallyUpdatedFilterPredicateBuilderBase}\n" +
+ " */\n");
+
+ add("public class IncrementallyUpdatedFilterPredicateBuilder extends IncrementallyUpdatedFilterPredicateBuilderBase {\n\n");
+
+ addVisitBegin("Eq");
+ for (TypeInfo info : TYPES) {
+ addEqNotEqCase(info, true);
+ }
+ addVisitEnd();
+
+ addVisitBegin("NotEq");
+ for (TypeInfo info : TYPES) {
+ addEqNotEqCase(info, false);
+ }
+ addVisitEnd();
+
+ addVisitBegin("Lt");
+ for (TypeInfo info : TYPES) {
+ addInequalityCase(info, "<");
+ }
+ addVisitEnd();
+
+ addVisitBegin("LtEq");
+ for (TypeInfo info : TYPES) {
+ addInequalityCase(info, "<=");
+ }
+ addVisitEnd();
+
+ addVisitBegin("Gt");
+ for (TypeInfo info : TYPES) {
+ addInequalityCase(info, ">");
+ }
+ addVisitEnd();
+
+ addVisitBegin("GtEq");
+ for (TypeInfo info : TYPES) {
+ addInequalityCase(info, ">=");
+ }
+ addVisitEnd();
+
+ add(" @Override\n" +
+ " public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> IncrementallyUpdatedFilterPredicate visit(UserDefined<T, U> pred) {\n");
+ addUdpBegin();
+ for (TypeInfo info : TYPES) {
+ addUdpCase(info, false);
+ }
+ addVisitEnd();
+
+ add(" @Override\n" +
+ " public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> IncrementallyUpdatedFilterPredicate visit(LogicalNotUserDefined<T, U> notPred) {\n" +
+ " UserDefined<T, U> pred = notPred.getUserDefined();\n");
+ addUdpBegin();
+ for (TypeInfo info : TYPES) {
+ addUdpCase(info, true);
+ }
+ addVisitEnd();
+
+ add("}\n");
+ writer.close();
+ }
+
+ private void addVisitBegin(String inVar) throws IOException {
+ add(" @Override\n" +
+ " public <T extends Comparable<T>> IncrementallyUpdatedFilterPredicate visit(" + inVar + "<T> pred) {\n" +
+ " ColumnPath columnPath = pred.getColumn().getColumnPath();\n" +
+ " Class<T> clazz = pred.getColumn().getColumnType();\n" +
+ "\n" +
+ " ValueInspector valueInspector = null;\n\n");
+ }
+
+ private void addVisitEnd() throws IOException {
+ add(" if (valueInspector == null) {\n" +
+ " throw new IllegalArgumentException(\"Encountered unknown type \" + clazz);\n" +
+ " }\n" +
+ "\n" +
+ " addValueInspector(columnPath, valueInspector);\n" +
+ " return valueInspector;\n" +
+ " }\n\n");
+ }
+
+ private void addEqNotEqCase(TypeInfo info, boolean isEq) throws IOException {
+ add(" if (clazz.equals(" + info.className + ".class)) {\n" +
+ " if (pred.getValue() == null) {\n" +
+ " valueInspector = new ValueInspector() {\n" +
+ " @Override\n" +
+ " public void updateNull() {\n" +
+ " setResult(" + isEq + ");\n" +
+ " }\n" +
+ "\n" +
+ " @Override\n" +
+ " public void update(" + info.primitiveName + " value) {\n" +
+ " setResult(" + !isEq + ");\n" +
+ " }\n" +
+ " };\n" +
+ " } else {\n" +
+ " final " + info.primitiveName + " target = (" + info.className + ") (Object) pred.getValue();\n" +
+ "\n" +
+ " valueInspector = new ValueInspector() {\n" +
+ " @Override\n" +
+ " public void updateNull() {\n" +
+ " setResult(" + !isEq +");\n" +
+ " }\n" +
+ "\n" +
+ " @Override\n" +
+ " public void update(" + info.primitiveName + " value) {\n");
+
+ if (info.useComparable) {
+ add(" setResult(" + compareEquality("value", "target", isEq) + ");\n");
+ } else {
+ add(" setResult(" + (isEq ? "value == target" : "value != target" ) + ");\n");
+ }
+
+ add(" }\n" +
+ " };\n" +
+ " }\n" +
+ " }\n\n");
+ }
+
+ private void addInequalityCase(TypeInfo info, String op) throws IOException {
+ if (!info.supportsInequality) {
+ add(" if (clazz.equals(" + info.className + ".class)) {\n");
+ add(" throw new IllegalArgumentException(\"Operator " + op + " not supported for " + info.className + "\");\n");
+ add(" }\n\n");
+ return;
+ }
+
+ add(" if (clazz.equals(" + info.className + ".class)) {\n" +
+ " final " + info.primitiveName + " target = (" + info.className + ") (Object) pred.getValue();\n" +
+ "\n" +
+ " valueInspector = new ValueInspector() {\n" +
+ " @Override\n" +
+ " public void updateNull() {\n" +
+ " setResult(false);\n" +
+ " }\n" +
+ "\n" +
+ " @Override\n" +
+ " public void update(" + info.primitiveName + " value) {\n");
+
+ if (info.useComparable) {
+ add(" setResult(value.compareTo(target) " + op + " 0);\n");
+ } else {
+ add(" setResult(value " + op + " target);\n");
+ }
+ add(" }\n" +
+ " };\n" +
+ " }\n\n");
+ }
+
+ private void addUdpBegin() throws IOException {
+ add(" ColumnPath columnPath = pred.getColumn().getColumnPath();\n" +
+ " Class<T> clazz = pred.getColumn().getColumnType();\n" +
+ "\n" +
+ " ValueInspector valueInspector = null;\n" +
+ "\n" +
+ " final U udp = pred.getUserDefinedPredicate();\n" +
+ "\n");
+ }
+
+ private void addUdpCase(TypeInfo info, boolean invert)throws IOException {
+ add(" if (clazz.equals(" + info.className + ".class)) {\n" +
+ " valueInspector = new ValueInspector() {\n" +
+ " @Override\n" +
+ " public void updateNull() {\n" +
+ " setResult(" + (invert ? "!" : "") + "udp.keep(null));\n" +
+ " }\n" +
+ "\n" +
+ " @SuppressWarnings(\"unchecked\")\n" +
+ " @Override\n" +
+ " public void update(" + info.primitiveName + " value) {\n" +
+ " setResult(" + (invert ? "!" : "") + "udp.keep((T) (Object) value));\n" +
+ " }\n" +
+ " };\n" +
+ " }\n\n");
+ }
+
+ private String compareEquality(String var, String target, boolean eq) {
+ return var + ".compareTo(" + target + ")" + (eq ? " == 0 " : " != 0");
+ }
+
+ private void add(String s) throws IOException {
+ writer.write(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml
index c3caa2f..ae89233 100644
--- a/parquet-hadoop-bundle/pom.xml
+++ b/parquet-hadoop-bundle/pom.xml
@@ -3,7 +3,7 @@
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index c1b4f07..d074b0c 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -3,7 +3,7 @@
<groupId>com.twitter</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
- <version>1.5.1-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -78,9 +78,11 @@
<build>
<plugins>
+<!-- turned off temporarily, must be turned back on after 1.6.0 is released.
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
+-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
new file mode 100644
index 0000000..4da9821
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
@@ -0,0 +1,63 @@
+package parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.FilterCompat.NoOpFilter;
+import parquet.filter2.compat.FilterCompat.Visitor;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.SchemaCompatibilityValidator;
+import parquet.filter2.statisticslevel.StatisticsFilter;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.schema.MessageType;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
+ * If the Filter is an {@link parquet.filter.UnboundRecordFilter} or the no op filter,
+ * no filtering will be performed.
+ */
+public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
+ private final List<BlockMetaData> blocks;
+ private final MessageType schema;
+
+ public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
+ checkNotNull(filter, "filter");
+ return filter.accept(new RowGroupFilter(blocks, schema));
+ }
+
+ private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
+ this.blocks = checkNotNull(blocks, "blocks");
+ this.schema = checkNotNull(schema, "schema");
+ }
+
+ @Override
+ public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
+ FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
+
+ // check that the schema of the filter matches the schema of the file
+ SchemaCompatibilityValidator.validate(filterPredicate, schema);
+
+ List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
+
+ for (BlockMetaData block : blocks) {
+ if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
+ filteredBlocks.add(block);
+ }
+ }
+
+ return filteredBlocks;
+ }
+
+ @Override
+ public List<BlockMetaData> visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
+ return blocks;
+ }
+
+ @Override
+ public List<BlockMetaData> visit(NoOpFilter noOpFilter) {
+ return blocks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
new file mode 100644
index 0000000..4daed5a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -0,0 +1,244 @@
+package parquet.filter2.statisticslevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+import parquet.filter2.predicate.UserDefinedPredicate;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import static parquet.Preconditions.checkArgument;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Applies a {@link parquet.filter2.predicate.FilterPredicate} to statistics about a group of
+ * records.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * Returns true if all the records represented by the statistics in the provided column metadata can be dropped.
+ * false otherwise (including when it is not known, which is often the case).
+ */
+// TODO: this belongs in the parquet-column project, but some of the classes here need to be moved too
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
+public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
+
+ public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
+ checkNotNull(pred, "pred");
+ checkNotNull(columns, "columns");
+ return pred.accept(new StatisticsFilter(columns));
+ }
+
+ private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+ private StatisticsFilter(List<ColumnChunkMetaData> columnsList) {
+ for (ColumnChunkMetaData chunk : columnsList) {
+ columns.put(chunk.getPath(), chunk);
+ }
+ }
+
+ private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+ ColumnChunkMetaData c = columns.get(columnPath);
+ checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
+ return c;
+ }
+
+ // is this column chunk composed entirely of nulls?
+ private boolean isAllNulls(ColumnChunkMetaData column) {
+ return column.getStatistics().getNumNulls() == column.getValueCount();
+ }
+
+ // are there any nulls in this column chunk?
+ private boolean hasNulls(ColumnChunkMetaData column) {
+ return column.getStatistics().getNumNulls() > 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
+ Column<T> filterColumn = eq.getColumn();
+ T value = eq.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (value == null) {
+ // we are looking for records where v eq(null)
+ // so drop if there are no nulls in this chunk
+ return !hasNulls(columnChunk);
+ }
+
+ if (isAllNulls(columnChunk)) {
+ // we are looking for records where v eq(someNonNull)
+ // and this is a column of all nulls, so drop it
+ return true;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if value < min || value > max
+ return value.compareTo(stats.genericGetMin()) < 0 || value.compareTo(stats.genericGetMax()) > 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
+ Column<T> filterColumn = notEq.getColumn();
+ T value = notEq.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (value == null) {
+ // we are looking for records where v notEq(null)
+ // so, if this is a column of all nulls, we can drop it
+ return isAllNulls(columnChunk);
+ }
+
+ if (hasNulls(columnChunk)) {
+ // we are looking for records where v notEq(someNonNull)
+ // but this chunk contains nulls, we cannot drop it
+ return false;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if this is a column where min = max = value
+ return value.compareTo(stats.genericGetMin()) == 0 && value.compareTo(stats.genericGetMax()) == 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
+ Column<T> filterColumn = lt.getColumn();
+ T value = lt.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (isAllNulls(columnChunk)) {
+ // we are looking for records where v < someValue
+ // this chunk is all nulls, so we can drop it
+ return true;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if value <= min
+ return value.compareTo(stats.genericGetMin()) <= 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
+ Column<T> filterColumn = ltEq.getColumn();
+ T value = ltEq.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (isAllNulls(columnChunk)) {
+ // we are looking for records where v <= someValue
+ // this chunk is all nulls, so we can drop it
+ return true;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if value < min
+ return value.compareTo(stats.genericGetMin()) < 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
+ Column<T> filterColumn = gt.getColumn();
+ T value = gt.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (isAllNulls(columnChunk)) {
+ // we are looking for records where v > someValue
+ // this chunk is all nulls, so we can drop it
+ return true;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if value >= max
+ return value.compareTo(stats.genericGetMax()) >= 0;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
+ Column<T> filterColumn = gtEq.getColumn();
+ T value = gtEq.getValue();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+ if (isAllNulls(columnChunk)) {
+ // we are looking for records where v >= someValue
+ // this chunk is all nulls, so we can drop it
+ return true;
+ }
+
+ Statistics<T> stats = columnChunk.getStatistics();
+
+ // drop if value >= max
+ return value.compareTo(stats.genericGetMax()) > 0;
+ }
+
+ @Override
+ public Boolean visit(And and) {
+ return and.getLeft().accept(this) && and.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Or or) {
+ // seems unintuitive to put an && not an || here
+ // but we can only drop a chunk of records if we know that
+ // both the left and right predicates agree that no matter what
+ // we don't need this chunk.
+ return or.getLeft().accept(this) && or.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Not not) {
+ throw new IllegalArgumentException(
+ "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+ }
+
+ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+ Column<T> filterColumn = ud.getColumn();
+ ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+ U udp = ud.getUserDefinedPredicate();
+ Statistics<T> stats = columnChunk.getStatistics();
+ parquet.filter2.predicate.Statistics<T> udpStats =
+ new parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), stats.genericGetMax());
+
+ if (inverted) {
+ return udp.inverseCanDrop(udpStats);
+ } else {
+ return udp.canDrop(udpStats);
+ }
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud) {
+ return visit(ud, false);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> lnud) {
+ return visit(lnud.getUserDefined(), true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 81a39b8..5bd6869 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -15,9 +15,6 @@
*/
package parquet.format.converter;
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,8 +30,9 @@ import java.util.Map.Entry;
import java.util.Set;
import parquet.Log;
-import parquet.format.ConvertedType;
+import parquet.common.schema.ColumnPath;
import parquet.format.ColumnChunk;
+import parquet.format.ConvertedType;
import parquet.format.DataPageHeader;
import parquet.format.DictionaryPageHeader;
import parquet.format.Encoding;
@@ -49,11 +47,9 @@ import parquet.format.Statistics;
import parquet.format.Type;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ParquetDecodingException;
-import parquet.schema.Types;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;
@@ -61,6 +57,10 @@ import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;
import parquet.schema.TypeVisitor;
+import parquet.schema.Types;
+
+import static parquet.format.Util.readFileMetaData;
+import static parquet.format.Util.writePageHeader;
public class ParquetMetadataConverter {
private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index f3aa81f..5a9b019 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -18,12 +18,16 @@ package parquet.hadoop;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+
import parquet.Log;
import parquet.column.ColumnDescriptor;
import parquet.column.page.PageReadStore;
import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.util.counters.BenchmarkCounter;
@@ -37,12 +41,14 @@ import parquet.schema.Type;
import static java.lang.String.format;
import static parquet.Log.DEBUG;
+import static parquet.Preconditions.checkNotNull;
import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
class InternalParquetRecordReader<T> {
private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+ private final Filter filter;
private MessageType requestedSchema;
private MessageType fileSchema;
@@ -57,7 +63,6 @@ class InternalParquetRecordReader<T> {
private int currentBlock = -1;
private ParquetFileReader reader;
private parquet.io.RecordReader<T> recordReader;
- private UnboundRecordFilter recordFilter;
private boolean strictTypeChecking;
private long totalTimeSpentReadingBytes;
@@ -70,19 +75,28 @@ class InternalParquetRecordReader<T> {
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ * @param filter for filtering individual records
+ */
+ public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+ this.readSupport = readSupport;
+ this.filter = checkNotNull(filter, "filter");
+ }
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
*/
public InternalParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, null);
+ this(readSupport, FilterCompat.NOOP);
}
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
* @param filter Optional filter for only returning matching records.
+ * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
*/
- public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
- filter) {
- this.readSupport = readSupport;
- this.recordFilter = filter;
+ @Deprecated
+ public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+ this(readSupport, FilterCompat.get(filter));
}
private void checkRead() throws IOException {
@@ -109,7 +123,7 @@ class InternalParquetRecordReader<T> {
LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
- recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+ recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
startedAssemblingCurrentBlockAt = System.currentTimeMillis();
totalCountLoadedSoFar += pages.getRowCount();
++ currentBlock;
@@ -169,27 +183,36 @@ class InternalParquetRecordReader<T> {
}
public boolean nextKeyValue() throws IOException, InterruptedException {
- if (current < total) {
+ boolean recordFound = false;
+
+ while (!recordFound) {
+ // no more records left
+ if (current >= total) { return false; }
+
try {
checkRead();
currentValue = recordReader.read();
current ++;
- while (currentValue == null) { // only happens with FilteredRecordReader at end of block
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ if (DEBUG) LOG.debug("skipping record");
+ continue;
+ }
+
+ if (currentValue == null) {
+ // only happens with FilteredRecordReader at end of block
current = totalCountLoadedSoFar;
- if (current < total) {
- checkRead();
- currentValue = recordReader.read();
- current ++;
- continue;
- }
- return false;
+ if (DEBUG) LOG.debug("filtered record reader reached end of block");
+ continue;
}
+
+ recordFound = true;
+
if (DEBUG) LOG.debug("read value: " + currentValue);
} catch (RuntimeException e) {
throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
}
- return true;
}
- return false;
+ return true;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index 2a2f054..e660c9f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,11 +15,6 @@
*/
package parquet.hadoop;
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
@@ -52,6 +47,7 @@ import parquet.column.ColumnDescriptor;
import parquet.column.page.DictionaryPage;
import parquet.column.page.Page;
import parquet.column.page.PageReadStore;
+import parquet.common.schema.ColumnPath;
import parquet.format.PageHeader;
import parquet.format.Util;
import parquet.format.converter.ParquetMetadataConverter;
@@ -59,11 +55,15 @@ import parquet.hadoop.CodecFactory.BytesDecompressor;
import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.hadoop.util.counters.BenchmarkCounter;
import parquet.io.ParquetDecodingException;
+import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.hadoop.ParquetFileWriter.MAGIC;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+
/**
* Internal implementation of the Parquet file reader as a block container
*
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index ff29179..f3ef61b 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -15,9 +15,6 @@
*/
package parquet.hadoop;
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -40,10 +37,10 @@ import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.column.page.DictionaryPage;
import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.GlobalMetaData;
@@ -52,6 +49,9 @@ import parquet.io.ParquetEncodingException;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
/**
* Internal implementation of the Parquet file writer as a block container
*
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 882d2f7..0231ccd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -43,6 +43,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import parquet.Log;
import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
+import parquet.filter2.predicate.FilterPredicate;
import parquet.hadoop.api.InitContext;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.api.ReadSupport.ReadContext;
@@ -53,10 +57,13 @@ import parquet.hadoop.metadata.GlobalMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.hadoop.util.ConfigurationUtil;
import parquet.hadoop.util.ContextUtil;
+import parquet.hadoop.util.SerializationUtil;
import parquet.io.ParquetDecodingException;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
+import static parquet.Preconditions.checkArgument;
+
/**
* The input format to read a Parquet file.
*
@@ -88,6 +95,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
+ /**
+ * key to configure the filter predicate
+ */
+ public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
+
private static final int MIN_FOOTER_CACHE_SIZE = 100;
private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
@@ -99,13 +111,40 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
}
public static void setUnboundRecordFilter(Job job, Class<? extends UnboundRecordFilter> filterClass) {
- ContextUtil.getConfiguration(job).set(UNBOUND_RECORD_FILTER, filterClass.getName());
+ Configuration conf = ContextUtil.getConfiguration(job);
+ checkArgument(getFilterPredicate(conf) == null,
+ "You cannot provide an UnboundRecordFilter after providing a FilterPredicate");
+
+ conf.set(UNBOUND_RECORD_FILTER, filterClass.getName());
}
+ /**
+ * @deprecated use {@link #getFilter(Configuration)}
+ */
+ @Deprecated
public static Class<?> getUnboundRecordFilter(Configuration configuration) {
return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
}
+ private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
+ Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+ if (clazz == null) { return null; }
+
+ try {
+ UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
+
+ if (unboundRecordFilter instanceof Configurable) {
+ ((Configurable)unboundRecordFilter).setConf(configuration);
+ }
+
+ return unboundRecordFilter;
+ } catch (InstantiationException e) {
+ throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+ } catch (IllegalAccessException e) {
+ throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+ }
+ }
+
public static void setReadSupportClass(JobConf conf, Class<?> readSupportClass) {
conf.set(READ_SUPPORT_CLASS, readSupportClass.getName());
}
@@ -114,6 +153,34 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class);
}
+ public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) {
+ checkArgument(getUnboundRecordFilter(configuration) == null,
+ "You cannot provide a FilterPredicate after providing an UnboundRecordFilter");
+
+ configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString());
+ try {
+ SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static FilterPredicate getFilterPredicate(Configuration configuration) {
+ try {
+ return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Returns a non-null Filter, which is a wrapper around either a
+ * FilterPredicate, an UnboundRecordFilter, or a no-op filter.
+ */
+ public static Filter getFilter(Configuration conf) {
+ return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
+ }
+
/**
* Hadoop will instantiate using this constructor
*/
@@ -135,24 +202,12 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
public RecordReader<Void, T> createRecordReader(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+ ReadSupport<T> readSupport = getReadSupport(ContextUtil.getConfiguration(taskAttemptContext));
+
Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
- ReadSupport<T> readSupport = getReadSupport(conf);
- Class<?> unboundRecordFilterClass = getUnboundRecordFilter(conf);
- if (unboundRecordFilterClass == null) {
- return new ParquetRecordReader<T>(readSupport);
- } else {
- try {
- UnboundRecordFilter filter = (UnboundRecordFilter)unboundRecordFilterClass.newInstance();
- if (filter instanceof Configurable) {
- ((Configurable)filter).setConf(conf);
- }
- return new ParquetRecordReader<T>(readSupport, filter);
- } catch (InstantiationException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
- } catch (IllegalAccessException e) {
- throw new BadConfigurationException("could not instantiate unbound record filter class", e);
- }
- }
+
+ return new ParquetRecordReader<T>(readSupport, getFilter(conf));
}
/**
@@ -381,6 +436,12 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
configuration,
globalMetaData.getKeyValueMetaData(),
globalMetaData.getSchema()));
+
+ Filter filter = getFilter(configuration);
+
+ long rowGroupsDropped = 0;
+ long totalRowGroups = 0;
+
for (Footer footer : footers) {
final Path file = footer.getFile();
LOG.debug(file);
@@ -388,10 +449,21 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
FileStatus fileStatus = fs.getFileStatus(file);
ParquetMetadata parquetMetaData = footer.getParquetMetadata();
List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+ List<BlockMetaData> filteredBlocks = blocks;
+
+ totalRowGroups += blocks.size();
+ filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+ rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+ if (filteredBlocks.isEmpty()) {
+ continue;
+ }
+
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
splits.addAll(
generateSplits(
- blocks,
+ filteredBlocks,
fileBlockLocations,
fileStatus,
parquetMetaData.getFileMetaData(),
@@ -401,6 +473,14 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
maxSplitSize)
);
}
+
+ if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+ int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+ LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+ } else {
+ LOG.info("There were no row groups that could be dropped due to filter predicates");
+ }
+
return splits;
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index 9e9b4ff..da0c2ec 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import parquet.Log;
import parquet.column.Encoding;
import parquet.column.statistics.IntStatistics;
+import parquet.common.schema.ColumnPath;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index 3e85331..c56a402 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -17,7 +17,6 @@ package parquet.hadoop;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -30,6 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
import parquet.hadoop.api.InitContext;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.api.ReadSupport.ReadContext;
@@ -37,26 +39,32 @@ import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.GlobalMetaData;
import parquet.schema.MessageType;
+import static parquet.Preconditions.checkNotNull;
+
/**
* Read records from a Parquet file.
+ * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
*/
public class ParquetReader<T> implements Closeable {
- private ReadSupport<T> readSupport;
- private UnboundRecordFilter filter;
- private Configuration conf;
- private ReadContext readContext;
- private Iterator<Footer> footersIterator;
+ private final ReadSupport<T> readSupport;
+ private final Configuration conf;
+ private final ReadContext readContext;
+ private final Iterator<Footer> footersIterator;
+ private final GlobalMetaData globalMetaData;
+ private final Filter filter;
+
private InternalParquetRecordReader<T> reader;
- private GlobalMetaData globalMetaData;
/**
* @param file the file to read
* @param readSupport to materialize records
* @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
+ @Deprecated
public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
- this(file, readSupport, null);
+ this(new Configuration(), file, readSupport, FilterCompat.NOOP);
}
/**
@@ -64,31 +72,44 @@ public class ParquetReader<T> implements Closeable {
* @param file the file to read
* @param readSupport to materialize records
* @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
+ @Deprecated
public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
- this(conf, file, readSupport, null);
+ this(conf, file, readSupport, FilterCompat.NOOP);
}
/**
* @param file the file to read
* @param readSupport to materialize records
- * @param filter the filter to use to filter records
+ * @param unboundRecordFilter the filter to use to filter records
* @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
- public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
- this(new Configuration(), file, readSupport, filter);
+ @Deprecated
+ public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
}
/**
* @param conf the configuration
* @param file the file to read
* @param readSupport to materialize records
- * @param filter the filter to use to filter records
+ * @param unboundRecordFilter the filter to use to filter records
* @throws IOException
+ * @deprecated use {@link #builder(ReadSupport, Path)}
*/
- public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+ @Deprecated
+ public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
+ }
+
+ private ParquetReader(Configuration conf,
+ Path file,
+ ReadSupport<T> readSupport,
+ Filter filter) throws IOException {
this.readSupport = readSupport;
- this.filter = filter;
+ this.filter = checkNotNull(filter, "filter");
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
@@ -96,12 +117,6 @@ public class ParquetReader<T> implements Closeable {
List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
this.footersIterator = footers.iterator();
globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- for (Footer footer : footers) {
- blocks.addAll(footer.getParquetMetadata().getBlocks());
- }
-
MessageType schema = globalMetaData.getSchema();
Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
@@ -131,10 +146,15 @@ public class ParquetReader<T> implements Closeable {
}
if (footersIterator.hasNext()) {
Footer footer = footersIterator.next();
+
+ List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+
+ List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, footer.getParquetMetadata().getFileMetaData().getSchema());
+
reader = new InternalParquetRecordReader<T>(readSupport, filter);
reader.initialize(
readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
- readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+ readContext.getReadSupportMetadata(), footer.getFile(), filteredBlocks, conf);
}
}
@@ -144,4 +164,36 @@ public class ParquetReader<T> implements Closeable {
reader.close();
}
}
+
+ public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
+ return new Builder<T>(readSupport, path);
+ }
+
+ public static class Builder<T> {
+ private final ReadSupport<T> readSupport;
+ private final Path file;
+ private Configuration conf;
+ private Filter filter;
+
+ private Builder(ReadSupport<T> readSupport, Path path) {
+ this.readSupport = checkNotNull(readSupport, "readSupport");
+ this.file = checkNotNull(path, "path");
+ this.conf = new Configuration();
+ this.filter = FilterCompat.NOOP;
+ }
+
+ public Builder<T> withConf(Configuration conf) {
+ this.conf = checkNotNull(conf, "conf");
+ return this;
+ }
+
+ public Builder<T> withFilter(Filter filter) {
+ this.filter = checkNotNull(filter, "filter");
+ return this;
+ }
+
+ public ParquetReader<T> build() throws IOException {
+ return new ParquetReader<T>(conf, file, readSupport, filter);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index f6f4815..67c7dd7 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -16,17 +16,21 @@
package parquet.hadoop;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
import parquet.Log;
import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.util.counters.BenchmarkCounter;
import parquet.hadoop.util.ContextUtil;
+import parquet.hadoop.util.counters.BenchmarkCounter;
import parquet.schema.MessageTypeParser;
/**
@@ -41,24 +45,34 @@ import parquet.schema.MessageTypeParser;
public class ParquetRecordReader<T> extends RecordReader<Void, T> {
private static final Log LOG= Log.getLog(ParquetRecordReader.class);
- private InternalParquetRecordReader<T> internalReader;
+ private final InternalParquetRecordReader<T> internalReader;
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
*/
public ParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, null);
+ this(readSupport, FilterCompat.NOOP);
}
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter Optional filter for only returning matching records.
+ * @param filter for filtering individual records
*/
- public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+ public ParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
internalReader = new InternalParquetRecordReader<T>(readSupport, filter);
}
/**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ * @param filter for filtering individual records
+ * @deprecated use {@link #ParquetRecordReader(ReadSupport, Filter)}
+ */
+ @Deprecated
+ public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+ this(readSupport, FilterCompat.get(filter));
+ }
+
+ /**
* {@inheritDoc}
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
index 9c24475..41f27ed 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
@@ -204,6 +204,19 @@ public class ParquetWriter<T> implements Closeable {
this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}
+ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport) throws IOException {
+ this(file,
+ writeSupport,
+ DEFAULT_COMPRESSION_CODEC_NAME,
+ DEFAULT_BLOCK_SIZE,
+ DEFAULT_PAGE_SIZE,
+ DEFAULT_PAGE_SIZE,
+ DEFAULT_IS_DICTIONARY_ENABLED,
+ DEFAULT_IS_VALIDATING_ENABLED,
+ DEFAULT_WRITER_VERSION,
+ conf);
+ }
+
public void write(T object) throws IOException {
try {
writer.write(object);
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index d0b723b..9544865 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -18,23 +18,18 @@ package parquet.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.List;
import java.util.Arrays;
+import java.util.List;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
+import parquet.hadoop.Footer;
import parquet.hadoop.ParquetInputFormat;
import parquet.hadoop.ParquetInputSplit;
import parquet.hadoop.ParquetRecordReader;
-import parquet.hadoop.Footer;
@SuppressWarnings("deprecation")
public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
@@ -87,7 +82,9 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
splitLen = oldSplit.getLength();
try {
- realReader = new ParquetRecordReader<V>(newInputFormat.getReadSupport(oldJobConf));
+ realReader = new ParquetRecordReader<V>(newInputFormat.getReadSupport(oldJobConf),
+ ParquetInputFormat.getFilter(oldJobConf));
+
realReader.initialize(((ParquetInputSplitWrapper)oldSplit).realSplit, oldJobConf, reporter);
// read once to gain access to key and value objects
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
deleted file mode 100644
index ece6e63..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright 2014 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop.metadata;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * returns canonical representation of objects (similar to String.intern()) to save memory
- * if a.equals(b) then canonicalize(a) == canonicalize(b)
- * this class is thread safe
- * @author Julien Le Dem
- *
- * @param <T>
- */
-public class Canonicalizer<T> {
-
- private ConcurrentHashMap<T, T> canonicals = new ConcurrentHashMap<T, T>();
-
- /**
- * @param value the value to canonicalize
- * @return the corresponding canonical value
- */
- final public T canonicalize(T value) {
- T canonical = canonicals.get(value);
- if (canonical == null) {
- value = toCanonical(value);
- T existing = canonicals.putIfAbsent(value, value);
- // putIfAbsent is atomic, making sure we always return the same canonical representation of the value
- if (existing == null) {
- canonical = value;
- } else {
- canonical = existing;
- }
- }
- return canonical;
- }
-
- /**
- * @param value the value to canonicalize if needed
- * @return the canonicalized value
- */
- protected T toCanonical(T value) {
- return value;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 98de367..45af78a 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -18,8 +18,9 @@ package parquet.hadoop.metadata;
import java.util.Set;
import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
import parquet.column.statistics.BooleanStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
/**
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
index 074a900..9b9a7a8 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
@@ -19,6 +19,8 @@ import java.util.Arrays;
import java.util.Set;
import parquet.column.Encoding;
+import parquet.common.internal.Canonicalizer;
+import parquet.common.schema.ColumnPath;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
public class ColumnChunkProperties {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
deleted file mode 100644
index b179ae3..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop.metadata;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-public final class ColumnPath implements Iterable<String> {
-
- private static Canonicalizer<ColumnPath> paths = new Canonicalizer<ColumnPath>() {
- protected ColumnPath toCanonical(ColumnPath value) {
- String[] path = new String[value.p.length];
- for (int i = 0; i < value.p.length; i++) {
- path[i] = value.p[i].intern();
- }
- return new ColumnPath(path);
- }
- };
-
- public static ColumnPath get(String... path){
- return paths.canonicalize(new ColumnPath(path));
- }
-
- private final String[] p;
-
- private ColumnPath(String[] path) {
- this.p = path;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ColumnPath) {
- return Arrays.equals(p, ((ColumnPath)obj).p);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(p);
- }
-
- @Override
- public String toString() {
- return Arrays.toString(p);
- }
-
- @Override
- public Iterator<String> iterator() {
- return Arrays.asList(p).iterator();
- }
-
- public int size() {
- return p.length;
- }
-
- public String[] toArray() {
- return p;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
index 790b601..6121111 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.List;
import parquet.column.Encoding;
+import parquet.common.internal.Canonicalizer;
public class EncodingList implements Iterable<Encoding> {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
new file mode 100644
index 0000000..0cd5df5
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
@@ -0,0 +1,93 @@
+package parquet.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.Closeables;
+import parquet.Log;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/util/HadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public final class SerializationUtil {
+ private static final Log LOG = Log.getLog(SerializationUtil.class);
+
+ private SerializationUtil() { }
+
+ /**
+ * Reads an object (that was written using
+ * {@link #writeObjectToConfAsBase64}) from a configuration.
+ *
+ * @param key for the configuration
+ * @param conf to read from
+ * @return the read object, or null if key is not present in conf
+ * @throws IOException
+ */
+ public static void writeObjectToConfAsBase64(String key, Object obj, Configuration conf) throws IOException {
+ ByteArrayOutputStream baos = null;
+ GZIPOutputStream gos = null;
+ ObjectOutputStream oos = null;
+
+ try {
+ baos = new ByteArrayOutputStream();
+ gos = new GZIPOutputStream(baos);
+ oos = new ObjectOutputStream(gos);
+ oos.writeObject(obj);
+ } finally {
+ Closeables.close(oos);
+ Closeables.close(gos);
+ Closeables.close(baos);
+ }
+
+ conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), "UTF-8"));
+ }
+
+ /**
+ * Reads an object (that was written using
+ * {@link #writeObjectToConfAsBase64}) from a configuration
+ *
+ * @param key for the configuration
+ * @param conf to read from
+ * @return the read object, or null if key is not present in conf
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException {
+ String b64 = conf.get(key);
+ if (b64 == null) {
+ return null;
+ }
+
+ byte[] bytes = Base64.decodeBase64(b64.getBytes("UTF-8"));
+
+ ByteArrayInputStream bais = null;
+ GZIPInputStream gis = null;
+ ObjectInputStream ois = null;
+
+ try {
+ bais = new ByteArrayInputStream(bytes);
+ gis = new GZIPInputStream(bais);
+ ois = new ObjectInputStream(gis);
+ return (T) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not read object from config with key " + key, e);
+ } catch (ClassCastException e) {
+ throw new IOException("Couldn't cast object read from config with key " + key, e);
+ } finally {
+ Closeables.close(ois);
+ Closeables.close(gis);
+ Closeables.close(bais);
+ }
+ }
+}