You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:27 UTC
[30/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
deleted file mode 100644
index 8feace9..0000000
--- a/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.predicate;
-
-import java.io.Serializable;
-
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.io.api.Binary;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * These are the operators in a filter predicate expression tree.
- * They are constructed by using the methods in {@link FilterApi}
- */
-public final class Operators {
- private Operators() { }
-
- public static abstract class Column<T extends Comparable<T>> implements Serializable {
- private final ColumnPath columnPath;
- private final Class<T> columnType;
-
- protected Column(ColumnPath columnPath, Class<T> columnType) {
- checkNotNull(columnPath, "columnPath");
- checkNotNull(columnType, "columnType");
- this.columnPath = columnPath;
- this.columnType = columnType;
- }
-
- public Class<T> getColumnType() {
- return columnType;
- }
-
- public ColumnPath getColumnPath() {
- return columnPath;
- }
-
- @Override
- public String toString() {
- return "column(" + columnPath.toDotString() + ")";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Column column = (Column) o;
-
- if (!columnType.equals(column.columnType)) return false;
- if (!columnPath.equals(column.columnPath)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = columnPath.hashCode();
- result = 31 * result + columnType.hashCode();
- return result;
- }
- }
-
- public static interface SupportsEqNotEq { } // marker for columns that can be used with eq() and notEq()
- public static interface SupportsLtGt extends SupportsEqNotEq { } // marker for columns that can be used with lt(), ltEq(), gt(), gtEq()
-
- public static final class IntColumn extends Column<Integer> implements SupportsLtGt {
- IntColumn(ColumnPath columnPath) {
- super(columnPath, Integer.class);
- }
- }
-
- public static final class LongColumn extends Column<Long> implements SupportsLtGt {
- LongColumn(ColumnPath columnPath) {
- super(columnPath, Long.class);
- }
- }
-
- public static final class DoubleColumn extends Column<Double> implements SupportsLtGt {
- DoubleColumn(ColumnPath columnPath) {
- super(columnPath, Double.class);
- }
- }
-
- public static final class FloatColumn extends Column<Float> implements SupportsLtGt {
- FloatColumn(ColumnPath columnPath) {
- super(columnPath, Float.class);
- }
- }
-
- public static final class BooleanColumn extends Column<Boolean> implements SupportsEqNotEq {
- BooleanColumn(ColumnPath columnPath) {
- super(columnPath, Boolean.class);
- }
- }
-
- public static final class BinaryColumn extends Column<Binary> implements SupportsLtGt {
- BinaryColumn(ColumnPath columnPath) {
- super(columnPath, Binary.class);
- }
- }
-
- // base class for Eq, NotEq, Lt, Gt, LtEq, GtEq
- static abstract class ColumnFilterPredicate<T extends Comparable<T>> implements FilterPredicate, Serializable {
- private final Column<T> column;
- private final T value;
- private final String toString;
-
- protected ColumnFilterPredicate(Column<T> column, T value) {
- this.column = checkNotNull(column, "column");
-
- // Eq and NotEq allow value to be null, Lt, Gt, LtEq, GtEq however do not, so they guard against
- // null in their own constructors.
- this.value = value;
-
- String name = getClass().getSimpleName().toLowerCase();
- this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")";
- }
-
- public Column<T> getColumn() {
- return column;
- }
-
- public T getValue() {
- return value;
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ColumnFilterPredicate that = (ColumnFilterPredicate) o;
-
- if (!column.equals(that.column)) return false;
- if (value != null ? !value.equals(that.value) : that.value != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = column.hashCode();
- result = 31 * result + (value != null ? value.hashCode() : 0);
- result = 31 * result + getClass().hashCode();
- return result;
- }
- }
-
- public static final class Eq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value can be null
- Eq(Column<T> column, T value) {
- super(column, value);
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
-
- }
-
- public static final class NotEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value can be null
- NotEq(Column<T> column, T value) {
- super(column, value);
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
-
- public static final class Lt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value cannot be null
- Lt(Column<T> column, T value) {
- super(column, checkNotNull(value, "value"));
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- public static final class LtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value cannot be null
- LtEq(Column<T> column, T value) {
- super(column, checkNotNull(value, "value"));
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
-
- public static final class Gt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value cannot be null
- Gt(Column<T> column, T value) {
- super(column, checkNotNull(value, "value"));
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- public static final class GtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
-
- // value cannot be null
- GtEq(Column<T> column, T value) {
- super(column, checkNotNull(value, "value"));
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- // base class for And, Or
- private static abstract class BinaryLogicalFilterPredicate implements FilterPredicate, Serializable {
- private final FilterPredicate left;
- private final FilterPredicate right;
- private final String toString;
-
- protected BinaryLogicalFilterPredicate(FilterPredicate left, FilterPredicate right) {
- this.left = checkNotNull(left, "left");
- this.right = checkNotNull(right, "right");
- String name = getClass().getSimpleName().toLowerCase();
- this.toString = name + "(" + left + ", " + right + ")";
- }
-
- public FilterPredicate getLeft() {
- return left;
- }
-
- public FilterPredicate getRight() {
- return right;
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- BinaryLogicalFilterPredicate that = (BinaryLogicalFilterPredicate) o;
-
- if (!left.equals(that.left)) return false;
- if (!right.equals(that.right)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = left.hashCode();
- result = 31 * result + right.hashCode();
- result = 31 * result + getClass().hashCode();
- return result;
- }
- }
-
- public static final class And extends BinaryLogicalFilterPredicate {
-
- And(FilterPredicate left, FilterPredicate right) {
- super(left, right);
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- public static final class Or extends BinaryLogicalFilterPredicate {
-
- Or(FilterPredicate left, FilterPredicate right) {
- super(left, right);
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- public static class Not implements FilterPredicate, Serializable {
- private final FilterPredicate predicate;
- private final String toString;
-
- Not(FilterPredicate predicate) {
- this.predicate = checkNotNull(predicate, "predicate");
- this.toString = "not(" + predicate + ")";
- }
-
- public FilterPredicate getPredicate() {
- return predicate;
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Not not = (Not) o;
- return predicate.equals(not.predicate);
- }
-
- @Override
- public int hashCode() {
- return predicate.hashCode() * 31 + getClass().hashCode();
- }
- }
-
- public static abstract class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
- protected final Column<T> column;
-
- UserDefined(Column<T> column) {
- this.column = checkNotNull(column, "column");
- }
-
- public Column<T> getColumn() {
- return column;
- }
-
- public abstract U getUserDefinedPredicate();
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
- }
-
- public static final class UserDefinedByClass<T extends Comparable<T>, U extends UserDefinedPredicate<T>> extends UserDefined<T, U> {
- private final Class<U> udpClass;
- private final String toString;
- private static final String INSTANTIATION_ERROR_MESSAGE =
- "Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";
-
- UserDefinedByClass(Column<T> column, Class<U> udpClass) {
- super(column);
- this.udpClass = checkNotNull(udpClass, "udpClass");
- String name = getClass().getSimpleName().toLowerCase();
- this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
-
- // defensively try to instantiate the class early to make sure that it's possible
- getUserDefinedPredicate();
- }
-
- public Class<U> getUserDefinedPredicateClass() {
- return udpClass;
- }
-
- @Override
- public U getUserDefinedPredicate() {
- try {
- return udpClass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
- }
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- UserDefinedByClass that = (UserDefinedByClass) o;
-
- if (!column.equals(that.column)) return false;
- if (!udpClass.equals(that.udpClass)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = column.hashCode();
- result = 31 * result + udpClass.hashCode();
- result = result * 31 + getClass().hashCode();
- return result;
- }
- }
-
- public static final class UserDefinedByInstance<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> extends UserDefined<T, U> {
- private final String toString;
- private final U udpInstance;
-
- UserDefinedByInstance(Column<T> column, U udpInstance) {
- super(column);
- this.udpInstance = checkNotNull(udpInstance, "udpInstance");
- String name = getClass().getSimpleName().toLowerCase();
- this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpInstance + ")";
- }
-
- @Override
- public U getUserDefinedPredicate() {
- return udpInstance;
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- UserDefinedByInstance that = (UserDefinedByInstance) o;
-
- if (!column.equals(that.column)) return false;
- if (!udpInstance.equals(that.udpInstance)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = column.hashCode();
- result = 31 * result + udpInstance.hashCode();
- result = result * 31 + getClass().hashCode();
- return result;
- }
- }
-
- // Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use
- // of the not() operator
- public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
- private final UserDefined<T, U> udp;
- private final String toString;
-
- LogicalNotUserDefined(UserDefined<T, U> userDefined) {
- this.udp = checkNotNull(userDefined, "userDefined");
- this.toString = "inverted(" + udp + ")";
- }
-
- public UserDefined<T, U> getUserDefined() {
- return udp;
- }
-
- @Override
- public <R> R accept(Visitor<R> visitor) {
- return visitor.visit(this);
- }
-
- @Override
- public String toString() {
- return toString;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- LogicalNotUserDefined that = (LogicalNotUserDefined) o;
-
- if (!udp.equals(that.udp)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = udp.hashCode();
- result = result * 31 + getClass().hashCode();
- return result;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
deleted file mode 100644
index 05fcc8c..0000000
--- a/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.predicate;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import parquet.column.ColumnDescriptor;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.Operators.And;
-import parquet.filter2.predicate.Operators.Column;
-import parquet.filter2.predicate.Operators.ColumnFilterPredicate;
-import parquet.filter2.predicate.Operators.Eq;
-import parquet.filter2.predicate.Operators.Gt;
-import parquet.filter2.predicate.Operators.GtEq;
-import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
-import parquet.filter2.predicate.Operators.Lt;
-import parquet.filter2.predicate.Operators.LtEq;
-import parquet.filter2.predicate.Operators.Not;
-import parquet.filter2.predicate.Operators.NotEq;
-import parquet.filter2.predicate.Operators.Or;
-import parquet.filter2.predicate.Operators.UserDefined;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-
-import static parquet.Preconditions.checkArgument;
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Inspects the column types found in the provided {@link FilterPredicate} and compares them
- * to the actual schema found in the parquet file. If the provided predicate's types are
- * not consistent with the file schema, and IllegalArgumentException is thrown.
- *
- * Ideally, all this would be checked at compile time, and this class wouldn't be needed.
- * If we can come up with a way to do that, we should.
- *
- * This class is stateful, cannot be reused, and is not thread safe.
- *
- * TODO: detect if a column is optional or required and validate that eq(null)
- * TODO: is not called on required fields (is that too strict?)
- * TODO: (https://issues.apache.org/jira/browse/PARQUET-44)
- */
-public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Void> {
-
- public static void validate(FilterPredicate predicate, MessageType schema) {
- checkNotNull(predicate, "predicate");
- checkNotNull(schema, "schema");
- predicate.accept(new SchemaCompatibilityValidator(schema));
- }
-
- // A map of column name to the type the user supplied for this column.
- // Used to validate that the user did not provide different types for the same
- // column.
- private final Map<ColumnPath, Class<?>> columnTypesEncountered = new HashMap<ColumnPath, Class<?>>();
-
- // the columns (keyed by path) according to the file's schema. This is the source of truth, and
- // we are validating that what the user provided agrees with these.
- private final Map<ColumnPath, ColumnDescriptor> columnsAccordingToSchema = new HashMap<ColumnPath, ColumnDescriptor>();
-
- // the original type of a column, keyed by path
- private final Map<ColumnPath, OriginalType> originalTypes = new HashMap<ColumnPath, OriginalType>();
-
- private SchemaCompatibilityValidator(MessageType schema) {
-
- for (ColumnDescriptor cd : schema.getColumns()) {
- ColumnPath columnPath = ColumnPath.get(cd.getPath());
- columnsAccordingToSchema.put(columnPath, cd);
-
- OriginalType ot = schema.getType(cd.getPath()).getOriginalType();
- if (ot != null) {
- originalTypes.put(columnPath, ot);
- }
- }
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(Eq<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(NotEq<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(Lt<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(LtEq<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(Gt<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>> Void visit(GtEq<T> pred) {
- validateColumnFilterPredicate(pred);
- return null;
- }
-
- @Override
- public Void visit(And and) {
- and.getLeft().accept(this);
- and.getRight().accept(this);
- return null;
- }
-
- @Override
- public Void visit(Or or) {
- or.getLeft().accept(this);
- or.getRight().accept(this);
- return null;
- }
-
- @Override
- public Void visit(Not not) {
- not.getPredicate().accept(this);
- return null;
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(UserDefined<T, U> udp) {
- validateColumn(udp.getColumn());
- return null;
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(LogicalNotUserDefined<T, U> udp) {
- return udp.getUserDefined().accept(this);
- }
-
- private <T extends Comparable<T>> void validateColumnFilterPredicate(ColumnFilterPredicate<T> pred) {
- validateColumn(pred.getColumn());
- }
-
- private <T extends Comparable<T>> void validateColumn(Column<T> column) {
- ColumnPath path = column.getColumnPath();
-
- Class<?> alreadySeen = columnTypesEncountered.get(path);
- if (alreadySeen != null && !alreadySeen.equals(column.getColumnType())) {
- throw new IllegalArgumentException("Column: "
- + path.toDotString()
- + " was provided with different types in the same predicate."
- + " Found both: (" + alreadySeen + ", " + column.getColumnType() + ")");
- }
-
- if (alreadySeen == null) {
- columnTypesEncountered.put(path, column.getColumnType());
- }
-
- ColumnDescriptor descriptor = getColumnDescriptor(path);
-
- if (descriptor.getMaxRepetitionLevel() > 0) {
- throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. "
- + "Column " + path.toDotString() + " is repeated.");
- }
-
- ValidTypeMap.assertTypeValid(column, descriptor.getType(), originalTypes.get(path));
- }
-
- private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
- ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath);
- checkArgument(cd != null, "Column " + columnPath + " was not found in schema!");
- return cd;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java b/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
deleted file mode 100644
index 382109a..0000000
--- a/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.predicate;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Contains statistics about a group of records
- */
-public class Statistics<T> {
- private final T min;
- private final T max;
-
- public Statistics(T min, T max) {
- this.min = checkNotNull(min, "min");
- this.max = checkNotNull(max, "max");
- }
-
- public T getMin() {
- return min;
- }
-
- public T getMax() {
- return max;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java b/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
deleted file mode 100644
index e03c945..0000000
--- a/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.predicate;
-
-/**
- * A UserDefinedPredicate decides whether a record should be kept or dropped, first by
- * inspecting meta data about a group of records to see if the entire group can be dropped,
- * then by inspecting actual values of a single column. These predicates can be combined into
- * a complex boolean expression via the {@link FilterApi}.
- *
- * @param <T> The type of the column this predicate is applied to.
- */
-// TODO: consider avoiding autoboxing and adding the specialized methods for each type
-// TODO: downside is that's fairly unwieldy for users
-public abstract class UserDefinedPredicate<T extends Comparable<T>> {
-
- /**
- * A udp must have a default constructor.
- * The udp passed to {@link FilterApi} will not be serialized along with its state.
- * Only its class name will be recorded, it will be instantiated reflectively via the default
- * constructor.
- */
- public UserDefinedPredicate() { }
-
- /**
- * Return true to keep the record with this value, false to drop it.
- */
- public abstract boolean keep(T value);
-
- /**
- * Given information about a group of records (eg, the min and max value)
- * Return true to drop all the records in this group, false to keep them for further
- * inspection. Returning false here will cause the records to be loaded and each value
- * will be passed to {@link #keep} to make the final decision.
- *
- * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
- * though it is much more efficient to drop entire chunks of records here if you can.
- */
- public abstract boolean canDrop(Statistics<T> statistics);
-
- /**
- * Same as {@link #canDrop} except this method describes the logical inverse
- * behavior of this predicate. If this predicate is passed to the not() operator, then
- * {@link #inverseCanDrop} will be called instead of {@link #canDrop}
- *
- * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
- * though it is much more efficient to drop entire chunks of records here if you can.
- *
- * It may be valid to simply return !canDrop(statistics) but that is not always the case.
- * To illustrate, look at this re-implementation of a UDP that checks for values greater than 7:
- *
- * {@code
- *
- * // This is just an example, you should use the built in {@link FilterApi#gt} operator instead of
- * // implementing your own like this.
- *
- * public class IntGreaterThan7UDP extends UserDefinedPredicate<Integer> {
- * @Override
- * public boolean keep(Integer value) {
- * // here we just check if the value is greater than 7.
- * // here, parquet knows that if the predicate not(columnX, IntGreaterThan7UDP) is being evaluated,
- * // it is safe to simply use !IntEquals7UDP.keep(value)
- * return value > 7;
- * }
- *
- * @Override
- * public boolean canDrop(Statistics<Integer> statistics) {
- * // here we drop a group of records if they are all less than or equal to 7,
- * // (there can't possibly be any values greater than 7 in this group of records)
- * return statistics.getMax() <= 7;
- * }
- *
- * @Override
- * public boolean inverseCanDrop(Statistics<Integer> statistics) {
- * // here the predicate not(columnX, IntGreaterThan7UDP) is being evaluated, which means we want
- * // to keep all records whose value is is not greater than 7, or, rephrased, whose value is less than or equal to 7.
- * // notice what would happen if parquet just tried to evaluate !IntGreaterThan7UDP.canDrop():
- * // !IntGreaterThan7UDP.canDrop(stats) == !(stats.getMax() <= 7) == (stats.getMax() > 7)
- * // it would drop the following group of records: [100, 1, 2, 3], even though this group of records contains values
- * // less than than or equal to 7.
- *
- * // what we actually want to do is drop groups of records where the *min* is greater than 7, (not the max)
- * // for example: the group of records: [100, 8, 9, 10] has a min of 8, so there's no way there are going
- * // to be records with a value
- * // less than or equal to 7 in this group.
- * return statistics.getMin() > 7;
- * }
- * }
- * }
- */
- public abstract boolean inverseCanDrop(Statistics<T> statistics);
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
deleted file mode 100644
index 4cbfcc7..0000000
--- a/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.predicate;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.Operators.Column;
-import parquet.io.api.Binary;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-/**
- * Contains all valid mappings from class -> parquet type (and vice versa) for use in
- * {@link FilterPredicate}s
- *
- * This is a bit ugly, but it allows us to provide good error messages at runtime
- * when there are type mismatches.
- *
- * TODO: this has some overlap with {@link PrimitiveTypeName#javaType}
- * TODO: (https://issues.apache.org/jira/browse/PARQUET-30)
- */
-public class ValidTypeMap {
- private ValidTypeMap() { }
-
- // classToParquetType and parquetTypeToClass are used as a bi-directional map
- private static final Map<Class<?>, Set<FullTypeDescriptor>> classToParquetType = new HashMap<Class<?>, Set<FullTypeDescriptor>>();
- private static final Map<FullTypeDescriptor, Set<Class<?>>> parquetTypeToClass = new HashMap<FullTypeDescriptor, Set<Class<?>>>();
-
- // set up the mapping in both directions
- private static void add(Class<?> c, FullTypeDescriptor f) {
- Set<FullTypeDescriptor> descriptors = classToParquetType.get(c);
- if (descriptors == null) {
- descriptors = new HashSet<FullTypeDescriptor>();
- classToParquetType.put(c, descriptors);
- }
- descriptors.add(f);
-
- Set<Class<?>> classes = parquetTypeToClass.get(f);
- if (classes == null) {
- classes = new HashSet<Class<?>>();
- parquetTypeToClass.put(f, classes);
- }
- classes.add(c);
- }
-
- static {
- // basic primitive columns
- add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null));
- add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null));
- add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null));
- add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null));
- add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null));
-
- // Both of these binary types are valid
- add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null));
- add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null));
-
- add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8));
- add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8));
- }
-
- /**
- * Asserts that foundColumn was declared as a type that is compatible with the type for this column found
- * in the schema of the parquet file.
- *
- * @throws java.lang.IllegalArgumentException if the types do not align
- *
- * @param foundColumn the column as declared by the user
- * @param primitiveType the primitive type according to the schema
- * @param originalType the original type according to the schema
- */
- public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) {
- Class<T> foundColumnType = foundColumn.getColumnType();
- ColumnPath columnPath = foundColumn.getColumnPath();
-
- Set<FullTypeDescriptor> validTypeDescriptors = classToParquetType.get(foundColumnType);
- FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType);
-
- if (validTypeDescriptors == null) {
- StringBuilder message = new StringBuilder();
- message
- .append("Column ")
- .append(columnPath.toDotString())
- .append(" was declared as type: ")
- .append(foundColumnType.getName())
- .append(" which is not supported in FilterPredicates.");
-
- Set<Class<?>> supportedTypes = parquetTypeToClass.get(typeInFileMetaData);
- if (supportedTypes != null) {
- message
- .append(" Supported types for this column are: ")
- .append(supportedTypes);
- } else {
- message.append(" There are no supported types for columns of " + typeInFileMetaData);
- }
- throw new IllegalArgumentException(message.toString());
- }
-
- if (!validTypeDescriptors.contains(typeInFileMetaData)) {
- StringBuilder message = new StringBuilder();
- message
- .append("FilterPredicate column: ")
- .append(columnPath.toDotString())
- .append("'s declared type (")
- .append(foundColumnType.getName())
- .append(") does not match the schema found in file metadata. Column ")
- .append(columnPath.toDotString())
- .append(" is of type: ")
- .append(typeInFileMetaData)
- .append("\nValid types for this column are: ")
- .append(parquetTypeToClass.get(typeInFileMetaData));
- throw new IllegalArgumentException(message.toString());
- }
- }
-
- private static final class FullTypeDescriptor {
- private final PrimitiveTypeName primitiveType;
- private final OriginalType originalType;
-
- private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) {
- this.primitiveType = primitiveType;
- this.originalType = originalType;
- }
-
- public PrimitiveTypeName getPrimitiveType() {
- return primitiveType;
- }
-
- public OriginalType getOriginalType() {
- return originalType;
- }
-
- @Override
- public String toString() {
- return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- FullTypeDescriptor that = (FullTypeDescriptor) o;
-
- if (originalType != that.originalType) return false;
- if (primitiveType != that.primitiveType) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = primitiveType != null ? primitiveType.hashCode() : 0;
- result = 31 * result + (originalType != null ? originalType.hashCode() : 0);
- return result;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
deleted file mode 100644
index 8b1c872..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-import parquet.io.PrimitiveColumnIO;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-
-import static parquet.Preconditions.checkArgument;
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * See {@link FilteringRecordMaterializer}
- */
-public class FilteringGroupConverter extends GroupConverter {
- // the real converter
- private final GroupConverter delegate;
-
- // the path, from the root of the schema, to this converter
- // used ultimately by the primitive converter proxy to figure
- // out which column it represents.
- private final List<Integer> indexFieldPath;
-
- // for a given column, which nodes in the filter expression need
- // to be notified of this column's value
- private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn;
-
- // used to go from our indexFieldPath to the PrimitiveColumnIO for that column
- private final Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath;
-
- public FilteringGroupConverter(
- GroupConverter delegate,
- List<Integer> indexFieldPath,
- Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn, Map<List<Integer>,
- PrimitiveColumnIO> columnIOsByIndexFieldPath) {
-
- this.delegate = checkNotNull(delegate, "delegate");
- this.indexFieldPath = checkNotNull(indexFieldPath, "indexFieldPath");
- this.columnIOsByIndexFieldPath = checkNotNull(columnIOsByIndexFieldPath, "columnIOsByIndexFieldPath");
- this.valueInspectorsByColumn = checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
- }
-
- // When a converter is asked for, we get the real one from the delegate, then wrap it
- // in a filtering pass-through proxy.
- // TODO: making the assumption that getConverter(i) is only called once, is that valid?
- @Override
- public Converter getConverter(int fieldIndex) {
-
- // get the real converter from the delegate
- Converter delegateConverter = checkNotNull(delegate.getConverter(fieldIndex), "delegate converter");
-
- // determine the indexFieldPath for the converter proxy we're about to make, which is
- // this converter's path + the requested fieldIndex
- List<Integer> newIndexFieldPath = new ArrayList<Integer>(indexFieldPath.size() + 1);
- newIndexFieldPath.addAll(indexFieldPath);
- newIndexFieldPath.add(fieldIndex);
-
- if (delegateConverter.isPrimitive()) {
- PrimitiveColumnIO columnIO = getColumnIO(newIndexFieldPath);
- ColumnPath columnPath = ColumnPath.get(columnIO.getColumnDescriptor().getPath());
- ValueInspector[] valueInspectors = getValueInspectors(columnPath);
- return new FilteringPrimitiveConverter(delegateConverter.asPrimitiveConverter(), valueInspectors);
- } else {
- return new FilteringGroupConverter(delegateConverter.asGroupConverter(), newIndexFieldPath, valueInspectorsByColumn, columnIOsByIndexFieldPath);
- }
-
- }
-
- private PrimitiveColumnIO getColumnIO(List<Integer> indexFieldPath) {
- PrimitiveColumnIO found = columnIOsByIndexFieldPath.get(indexFieldPath);
- checkArgument(found != null, "Did not find PrimitiveColumnIO for index field path" + indexFieldPath);
- return found;
- }
-
- private ValueInspector[] getValueInspectors(ColumnPath columnPath) {
- List<ValueInspector> inspectorsList = valueInspectorsByColumn.get(columnPath);
- if (inspectorsList == null) {
- return new ValueInspector[] {};
- } else {
- return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]);
- }
- }
-
- @Override
- public void start() {
- delegate.start();
- }
-
- @Override
- public void end() {
- delegate.end();
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
deleted file mode 100644
index 33643ee..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import parquet.column.Dictionary;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-import parquet.io.api.Binary;
-import parquet.io.api.PrimitiveConverter;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * see {@link FilteringRecordMaterializer}
- *
- * This pass-through proxy for a delegate {@link PrimitiveConverter} also
- * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate}
- */
-public class FilteringPrimitiveConverter extends PrimitiveConverter {
- private final PrimitiveConverter delegate;
- private final ValueInspector[] valueInspectors;
-
- public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) {
- this.delegate = checkNotNull(delegate, "delegate");
- this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors");
- }
-
- // TODO: this works, but
- // TODO: essentially turns off the benefits of dictionary support
- // TODO: even if the underlying delegate supports it.
- // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36)
- @Override
- public boolean hasDictionarySupport() {
- return false;
- }
-
- @Override
- public void setDictionary(Dictionary dictionary) {
- throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
- }
-
- @Override
- public void addValueFromDictionary(int dictionaryId) {
- throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
- }
-
- @Override
- public void addBinary(Binary value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addBinary(value);
- }
-
- @Override
- public void addBoolean(boolean value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addBoolean(value);
- }
-
- @Override
- public void addDouble(double value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addDouble(value);
- }
-
- @Override
- public void addFloat(float value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addFloat(value);
- }
-
- @Override
- public void addInt(int value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addInt(value);
- }
-
- @Override
- public void addLong(long value) {
- for (ValueInspector valueInspector : valueInspectors) {
- valueInspector.update(value);
- }
- delegate.addLong(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
deleted file mode 100644
index d4a2926..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-import parquet.io.PrimitiveColumnIO;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate}
- * as it receives concrete values for the current record. If, after the record assembly signals that
- * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()}
- * returns null to signal that this record is being skipped.
- * Otherwise, the record is retrieved from the delegate.
- */
-public class FilteringRecordMaterializer<T> extends RecordMaterializer<T> {
- // the real record materializer
- private final RecordMaterializer<T> delegate;
-
- // the proxied root converter
- private final FilteringGroupConverter rootConverter;
-
- // the predicate
- private final IncrementallyUpdatedFilterPredicate filterPredicate;
-
- public FilteringRecordMaterializer(
- RecordMaterializer<T> delegate,
- List<PrimitiveColumnIO> columnIOs,
- Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn,
- IncrementallyUpdatedFilterPredicate filterPredicate) {
-
- checkNotNull(columnIOs, "columnIOs");
- checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
- this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
- this.delegate = checkNotNull(delegate, "delegate");
-
- // keep track of which path of indices leads to which primitive column
- Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
-
- for (PrimitiveColumnIO c : columnIOs) {
- columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
- }
-
- // create a proxy for the delegate's root converter
- this.rootConverter = new FilteringGroupConverter(
- delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
- }
-
- public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
- return intArrayToList(c.getIndexFieldPath());
- }
-
- public static List<Integer> intArrayToList(int[] arr) {
- List<Integer> list = new ArrayList<Integer>(arr.length);
- for (int i : arr) {
- list.add(i);
- }
- return list;
- }
-
-
-
- @Override
- public T getCurrentRecord() {
-
- // find out if the predicate thinks we should keep this record
- boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate);
-
- // reset the stateful predicate no matter what
- IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
-
- if (keep) {
- return delegate.getCurrentRecord();
- } else {
- // signals a skip
- return null;
- }
- }
-
- @Override
- public void skipCurrentRecord() {
- delegate.skipCurrentRecord();
- }
-
- @Override
- public GroupConverter getRootConverter() {
- return rootConverter;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
deleted file mode 100644
index 81fc01e..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import parquet.io.api.Binary;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * A rewritten version of a {@link parquet.filter2.predicate.FilterPredicate} which receives
- * the values for a record's columns one by one and internally tracks whether the predicate is
- * satisfied, unsatisfied, or unknown.
- *
- * This is used to apply a predicate during record assembly, without assembling a second copy of
- * a record, and without building a stack of update events.
- *
- * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is
- * {@link parquet.filter2.predicate.FilterPredicate}
- */
-public interface IncrementallyUpdatedFilterPredicate {
-
- /**
- * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern.
- */
- public static interface Visitor {
- boolean visit(ValueInspector p);
- boolean visit(And and);
- boolean visit(Or or);
- }
-
- /**
- * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern.
- */
- boolean accept(Visitor visitor);
-
- /**
- * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents,
- * and decides whether or not the predicate represented by this node is satisfied.
- *
- * It is stateful, and needs to be rest after use.
- */
- public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate {
- // package private constructor
- ValueInspector() { }
-
- private boolean result = false;
- private boolean isKnown = false;
-
- // these methods signal what the value is
- public void updateNull() { throw new UnsupportedOperationException(); }
- public void update(int value) { throw new UnsupportedOperationException(); }
- public void update(long value) { throw new UnsupportedOperationException(); }
- public void update(double value) { throw new UnsupportedOperationException(); }
- public void update(float value) { throw new UnsupportedOperationException(); }
- public void update(boolean value) { throw new UnsupportedOperationException(); }
- public void update(Binary value) { throw new UnsupportedOperationException(); }
-
- /**
- * Reset to clear state and begin evaluating the next record.
- */
- public final void reset() {
- isKnown = false;
- result = false;
- }
-
- /**
- * Subclasses should call this method to signal that the result of this predicate is known.
- */
- protected final void setResult(boolean result) {
- if (isKnown) {
- throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!"
- + " Did you forget to call reset()?");
- }
- this.result = result;
- this.isKnown = true;
- }
-
- /**
- * Should only be called if {@link #isKnown} return true.
- */
- public final boolean getResult() {
- if (!isKnown) {
- throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!");
- }
- return result;
- }
-
- /**
- * Return true if this inspector has received a value yet, false otherwise.
- */
- public final boolean isKnown() {
- return isKnown;
- }
-
- @Override
- public boolean accept(Visitor visitor) {
- return visitor.visit(this);
- }
- }
-
- // base class for and / or
- static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
- private final IncrementallyUpdatedFilterPredicate left;
- private final IncrementallyUpdatedFilterPredicate right;
-
- BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
- this.left = checkNotNull(left, "left");
- this.right = checkNotNull(right, "right");
- }
-
- public final IncrementallyUpdatedFilterPredicate getLeft() {
- return left;
- }
-
- public final IncrementallyUpdatedFilterPredicate getRight() {
- return right;
- }
- }
-
- public static final class Or extends BinaryLogical {
- Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
- super(left, right);
- }
-
- @Override
- public boolean accept(Visitor visitor) {
- return visitor.visit(this);
- }
- }
-
- public static final class And extends BinaryLogical {
- And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
- super(left, right);
- }
-
- @Override
- public boolean accept(Visitor visitor) {
- return visitor.visit(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
deleted file mode 100644
index be8723b..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.FilterPredicate.Visitor;
-import parquet.filter2.predicate.Operators.And;
-import parquet.filter2.predicate.Operators.Not;
-import parquet.filter2.predicate.Operators.Or;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-
-import static parquet.Preconditions.checkArgument;
-
-/**
- * The implementation of this abstract class is auto-generated by
- * {@link parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}
- *
- * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link parquet.filter2.predicate.FilterPredicate}
- * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing.
- *
- * Note: the supplied predicate must not contain any instances of the not() operator as this is not
- * supported by this filter.
- *
- * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
- * in a form that doesn't make use of the not() operator.
- *
- * the supplied predicate should also have already been run through
- * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
- * to make sure it is compatible with the schema of this file.
- *
- * TODO: UserDefinedPredicates still autobox however
- */
-public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
- private boolean built = false;
- private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
-
- public IncrementallyUpdatedFilterPredicateBuilderBase() { }
-
- public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
- checkArgument(!built, "This builder has already been used");
- IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
- built = true;
- return incremental;
- }
-
- protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) {
- List<ValueInspector> valueInspectors = valueInspectorsByColumn.get(columnPath);
- if (valueInspectors == null) {
- valueInspectors = new ArrayList<ValueInspector>();
- valueInspectorsByColumn.put(columnPath, valueInspectors);
- }
- valueInspectors.add(valueInspector);
- }
-
- public Map<ColumnPath, List<ValueInspector>> getValueInspectorsByColumn() {
- return valueInspectorsByColumn;
- }
-
- @Override
- public final IncrementallyUpdatedFilterPredicate visit(And and) {
- return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));
- }
-
- @Override
- public final IncrementallyUpdatedFilterPredicate visit(Or or) {
- return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this));
- }
-
- @Override
- public final IncrementallyUpdatedFilterPredicate visit(Not not) {
- throw new IllegalArgumentException(
- "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
deleted file mode 100644
index f223029..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not.
- * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state
- * represent columns with a null value, and updates them accordingly.
- *
- * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome
- * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37)
- */
-public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor {
- private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator();
-
- public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) {
- checkNotNull(pred, "pred");
- return pred.accept(INSTANCE);
- }
-
- private IncrementallyUpdatedFilterPredicateEvaluator() {}
-
- @Override
- public boolean visit(ValueInspector p) {
- if (!p.isKnown()) {
- p.updateNull();
- }
- return p.getResult();
- }
-
- @Override
- public boolean visit(And and) {
- return and.getLeft().accept(this) && and.getRight().accept(this);
- }
-
- @Override
- public boolean visit(Or or) {
- return or.getLeft().accept(this) || or.getRight().accept(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
deleted file mode 100644
index 74ceca3..0000000
--- a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
-import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}.
- */
-public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor {
- private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter();
-
- public static void reset(IncrementallyUpdatedFilterPredicate pred) {
- checkNotNull(pred, "pred");
- pred.accept(INSTANCE);
- }
-
- private IncrementallyUpdatedFilterPredicateResetter() { }
-
- @Override
- public boolean visit(ValueInspector p) {
- p.reset();
- return false;
- }
-
- @Override
- public boolean visit(And and) {
- and.getLeft().accept(this);
- and.getRight().accept(this);
- return false;
- }
-
- @Override
- public boolean visit(Or or) {
- or.getLeft().accept(this);
- or.getRight().accept(this);
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
deleted file mode 100644
index aade88c..0000000
--- a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.io;
-
-import static parquet.Log.DEBUG;
-import parquet.Log;
-import parquet.column.ColumnReadStore;
-import parquet.io.RecordReaderImplementation.State;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.io.api.RecordMaterializer;
-
-// TODO(julien): this class appears to be unused -- can it be nuked? - todd
-public abstract class BaseRecordReader<T> extends RecordReader<T> {
- private static final Log LOG = Log.getLog(BaseRecordReader.class);
-
- public RecordConsumer recordConsumer;
- public RecordMaterializer<T> recordMaterializer;
- public ColumnReadStore columnStore;
- @Override
- public T read() {
- readOneRecord();
- return recordMaterializer.getCurrentRecord();
- }
-
- protected abstract void readOneRecord();
-
- State[] caseLookup;
-
- private String endField;
-
- private int endIndex;
-
- protected void currentLevel(int currentLevel) {
- if (DEBUG) LOG.debug("currentLevel: "+currentLevel);
- }
-
- protected void log(String message) {
- if (DEBUG) LOG.debug("bc: "+message);
- }
-
- final protected int getCaseId(int state, int currentLevel, int d, int nextR) {
- return caseLookup[state].getCase(currentLevel, d, nextR).getID();
- }
-
- final protected void startMessage() {
- // reset state
- endField = null;
- if (DEBUG) LOG.debug("startMessage()");
- recordConsumer.startMessage();
- }
-
- final protected void startGroup(String field, int index) {
- startField(field, index);
- if (DEBUG) LOG.debug("startGroup()");
- recordConsumer.startGroup();
- }
-
- private void startField(String field, int index) {
- if (DEBUG) LOG.debug("startField("+field+","+index+")");
- if (endField != null && index == endIndex) {
- // skip the close/open tag
- endField = null;
- } else {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- recordConsumer.startField(field, index);
- }
- }
-
- final protected void addPrimitiveINT64(String field, int index, long value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addLong("+value+")");
- recordConsumer.addLong(value);
- endField(field, index);
- }
-
- private void endField(String field, int index) {
- if (DEBUG) LOG.debug("endField("+field+","+index+")");
- if (endField != null) {
- recordConsumer.endField(endField, endIndex);
- }
- endField = field;
- endIndex = index;
- }
-
- final protected void addPrimitiveBINARY(String field, int index, Binary value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addBinary("+value+")");
- recordConsumer.addBinary(value);
- endField(field, index);
- }
-
- final protected void addPrimitiveINT32(String field, int index, int value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addInteger("+value+")");
- recordConsumer.addInteger(value);
- endField(field, index);
- }
-
- final protected void endGroup(String field, int index) {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- if (DEBUG) LOG.debug("endGroup()");
- recordConsumer.endGroup();
- endField(field, index);
- }
-
- final protected void endMessage() {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- if (DEBUG) LOG.debug("endMessage()");
- recordConsumer.endMessage();
- }
-
- protected void error(String message) {
- throw new ParquetDecodingException(message);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/io/ColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/ColumnIO.java b/parquet-column/src/main/java/parquet/io/ColumnIO.java
deleted file mode 100644
index c00f083..0000000
--- a/parquet-column/src/main/java/parquet/io/ColumnIO.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.io;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import parquet.Log;
-import parquet.schema.Type;
-import parquet.schema.Type.Repetition;
-
-/**
- * a structure used to serialize deserialize records
- *
- * @author Julien Le Dem
- *
- */
-abstract public class ColumnIO {
-
- static final boolean DEBUG = Log.DEBUG;
-
- private final GroupColumnIO parent;
- private final Type type;
- private final String name;
- private final int index;
- private int repetitionLevel;
- private int definitionLevel;
- private String[] fieldPath;
- private int[] indexFieldPath;
-
-
- ColumnIO(Type type, GroupColumnIO parent, int index) {
- this.type = type;
- this.parent = parent;
- this.index = index;
- this.name = type.getName();
- }
-
- String[] getFieldPath() {
- return fieldPath;
- }
-
- public String getFieldPath(int level) {
- return fieldPath[level];
- }
-
- public int[] getIndexFieldPath() {
- return indexFieldPath;
- }
-
- public int getIndexFieldPath(int level) {
- return indexFieldPath[level];
- }
-
- public int getIndex() {
- return this.index;
- }
-
- public String getName() {
- return name;
- }
-
- int getRepetitionLevel() {
- return repetitionLevel;
- }
-
- int getDefinitionLevel() {
- return definitionLevel;
- }
-
- void setRepetitionLevel(int repetitionLevel) {
- this.repetitionLevel = repetitionLevel;
- }
-
- void setDefinitionLevel(int definitionLevel) {
- this.definitionLevel = definitionLevel;
- }
-
- void setFieldPath(String[] fieldPath, int[] indexFieldPath) {
- this.fieldPath = fieldPath;
- this.indexFieldPath = indexFieldPath;
- }
-
- public Type getType() {
- return type;
- }
-
- void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List<ColumnIO> repetition, List<ColumnIO> path) {
- setRepetitionLevel(r);
- setDefinitionLevel(d);
- setFieldPath(fieldPath, indexFieldPath);
- }
-
- abstract List<String[]> getColumnNames();
-
- public GroupColumnIO getParent() {
- return parent;
- }
-
- abstract PrimitiveColumnIO getLast();
- abstract PrimitiveColumnIO getFirst();
-
- ColumnIO getParent(int r) {
- if (getRepetitionLevel() == r && getType().isRepetition(Repetition.REPEATED)) {
- return this;
- } else if (getParent()!=null && getParent().getDefinitionLevel()>=r) {
- return getParent().getParent(r);
- } else {
- throw new InvalidRecordException("no parent("+r+") for "+Arrays.toString(this.getFieldPath()));
- }
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName()+" "+type.getName()
- +" r:"+repetitionLevel
- +" d:"+definitionLevel
- +" "+Arrays.toString(fieldPath);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
deleted file mode 100644
index 66b0b31..0000000
--- a/parquet-column/src/main/java/parquet/io/ColumnIOFactory.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.io;
-
-import static parquet.schema.Type.Repetition.REQUIRED;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.Type;
-import parquet.schema.TypeVisitor;
-
-/**
- * Factory constructing the ColumnIO structure from the schema
- *
- * @author Julien Le Dem
- *
- */
-public class ColumnIOFactory {
-
- public class ColumnIOCreatorVisitor implements TypeVisitor {
-
- private MessageColumnIO columnIO;
- private GroupColumnIO current;
- private List<PrimitiveColumnIO> leaves = new ArrayList<PrimitiveColumnIO>();
- private final boolean validating;
- private final MessageType requestedSchema;
- private int currentRequestedIndex;
- private Type currentRequestedType;
- private boolean strictTypeChecking;
-
- public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
- this(validating, requestedSchema, true);
- }
-
- public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
- this.validating = validating;
- this.requestedSchema = requestedSchema;
- this.strictTypeChecking = strictTypeChecking;
- }
-
- @Override
- public void visit(MessageType messageType) {
- columnIO = new MessageColumnIO(requestedSchema, validating);
- visitChildren(columnIO, messageType, requestedSchema);
- columnIO.setLevels();
- columnIO.setLeaves(leaves);
- }
-
- @Override
- public void visit(GroupType groupType) {
- if (currentRequestedType.isPrimitive()) {
- incompatibleSchema(groupType, currentRequestedType);
- }
- GroupColumnIO newIO = new GroupColumnIO(groupType, current, currentRequestedIndex);
- current.add(newIO);
- visitChildren(newIO, groupType, currentRequestedType.asGroupType());
- }
-
- private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType requestedGroupType) {
- GroupColumnIO oldIO = current;
- current = newIO;
- for (Type type : groupType.getFields()) {
- // if the file schema does not contain the field it will just stay null
- if (requestedGroupType.containsField(type.getName())) {
- currentRequestedIndex = requestedGroupType.getFieldIndex(type.getName());
- currentRequestedType = requestedGroupType.getType(currentRequestedIndex);
- if (currentRequestedType.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
- incompatibleSchema(type, currentRequestedType);
- }
- type.accept(this);
- }
- }
- current = oldIO;
- }
-
- @Override
- public void visit(PrimitiveType primitiveType) {
- if (!currentRequestedType.isPrimitive() ||
- (this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
- incompatibleSchema(primitiveType, currentRequestedType);
- }
- PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
- current.add(newIO);
- leaves.add(newIO);
- }
-
- private void incompatibleSchema(Type fileType, Type requestedType) {
- throw new ParquetDecodingException("The requested schema is not compatible with the file schema. incompatible types: " + requestedType + " != " + fileType);
- }
-
- public MessageColumnIO getColumnIO() {
- return columnIO;
- }
-
- }
-
- private final boolean validating;
-
- /**
- * validation is off by default
- */
- public ColumnIOFactory() {
- this(false);
- }
-
- /**
- * @param validating to turn validation on
- */
- public ColumnIOFactory(boolean validating) {
- super();
- this.validating = validating;
- }
-
- /**
- * @param schema the requestedSchema we want to read/write
- * @param fileSchema the file schema (when reading it can be different from the requested schema)
- * @return the corresponding serializing/deserializing structure
- */
- public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
- return getColumnIO(requestedSchema, fileSchema, true);
- }
-
- /**
- * @param schema the requestedSchema we want to read/write
- * @param fileSchema the file schema (when reading it can be different from the requested schema)
- * @param strict should file type and requested primitive types match
- * @return the corresponding serializing/deserializing structure
- */
- public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
- ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
- fileSchema.accept(visitor);
- return visitor.getColumnIO();
- }
-
- /**
- * @param schema the schema we want to read/write
- * @return the corresponding serializing/deserializing structure
- */
- public MessageColumnIO getColumnIO(MessageType schema) {
- return this.getColumnIO(schema, schema);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/io/CompilationException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/CompilationException.java b/parquet-column/src/main/java/parquet/io/CompilationException.java
deleted file mode 100644
index d116cf5..0000000
--- a/parquet-column/src/main/java/parquet/io/CompilationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.io;
-
-import parquet.ParquetRuntimeException;
-
-/**
- * thrown when a problem occured while compiling the column reader
- *
- * @author Julien Le Dem
- *
- */
-public class CompilationException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- public CompilationException() {
- }
-
- public CompilationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public CompilationException(String message) {
- super(message);
- }
-
- public CompilationException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/io/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/EmptyRecordReader.java b/parquet-column/src/main/java/parquet/io/EmptyRecordReader.java
deleted file mode 100644
index bdd7fed..0000000
--- a/parquet-column/src/main/java/parquet/io/EmptyRecordReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.io;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-
-/**
- * used to read empty schema
- *
- * @author Mickael Lacour <m....@criteo.com>
- *
- * @param <T> the type of the materialized record
- */
-class EmptyRecordReader<T> extends RecordReader<T> {
-
- private final GroupConverter recordConsumer;
- private final RecordMaterializer<T> recordMaterializer;
-
- public EmptyRecordReader(RecordMaterializer<T> recordMaterializer) {
- this.recordMaterializer = recordMaterializer;
- this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
- }
-
- /**
- * @see parquet.io.RecordReader#read()
- */
- @Override
- public T read() {
- recordConsumer.start();
- recordConsumer.end();
- return recordMaterializer.getCurrentRecord();
- }
-}