You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:35 UTC
[38/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
new file mode 100644
index 0000000..4f8b10d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Contains all valid mappings from class -> parquet type (and vice versa) for use in
+ * {@link FilterPredicate}s
+ *
+ * This is a bit ugly, but it allows us to provide good error messages at runtime
+ * when there are type mismatches.
+ *
+ * TODO: this has some overlap with {@link PrimitiveTypeName#javaType}
+ * TODO: (https://issues.apache.org/jira/browse/PARQUET-30)
+ */
+public class ValidTypeMap {
+ private ValidTypeMap() { }
+
+ // classToParquetType and parquetTypeToClass are used as a bi-directional map
+ private static final Map<Class<?>, Set<FullTypeDescriptor>> classToParquetType = new HashMap<Class<?>, Set<FullTypeDescriptor>>();
+ private static final Map<FullTypeDescriptor, Set<Class<?>>> parquetTypeToClass = new HashMap<FullTypeDescriptor, Set<Class<?>>>();
+
+ // set up the mapping in both directions
+ private static void add(Class<?> c, FullTypeDescriptor f) {
+ Set<FullTypeDescriptor> descriptors = classToParquetType.get(c);
+ if (descriptors == null) {
+ descriptors = new HashSet<FullTypeDescriptor>();
+ classToParquetType.put(c, descriptors);
+ }
+ descriptors.add(f);
+
+ Set<Class<?>> classes = parquetTypeToClass.get(f);
+ if (classes == null) {
+ classes = new HashSet<Class<?>>();
+ parquetTypeToClass.put(f, classes);
+ }
+ classes.add(c);
+ }
+
+ static {
+ // basic primitive columns
+ add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null));
+ add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null));
+ add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null));
+ add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null));
+ add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null));
+
+ // Both of these binary types are valid
+ add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null));
+ add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null));
+
+ add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8));
+ add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8));
+ }
+
+ /**
+ * Asserts that foundColumn was declared as a type that is compatible with the type for this column found
+ * in the schema of the parquet file.
+ *
+ * @throws java.lang.IllegalArgumentException if the types do not align
+ *
+ * @param foundColumn the column as declared by the user
+ * @param primitiveType the primitive type according to the schema
+ * @param originalType the original type according to the schema
+ */
+ public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) {
+ Class<T> foundColumnType = foundColumn.getColumnType();
+ ColumnPath columnPath = foundColumn.getColumnPath();
+
+ Set<FullTypeDescriptor> validTypeDescriptors = classToParquetType.get(foundColumnType);
+ FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType);
+
+ if (validTypeDescriptors == null) {
+ StringBuilder message = new StringBuilder();
+ message
+ .append("Column ")
+ .append(columnPath.toDotString())
+ .append(" was declared as type: ")
+ .append(foundColumnType.getName())
+ .append(" which is not supported in FilterPredicates.");
+
+ Set<Class<?>> supportedTypes = parquetTypeToClass.get(typeInFileMetaData);
+ if (supportedTypes != null) {
+ message
+ .append(" Supported types for this column are: ")
+ .append(supportedTypes);
+ } else {
+ message.append(" There are no supported types for columns of " + typeInFileMetaData);
+ }
+ throw new IllegalArgumentException(message.toString());
+ }
+
+ if (!validTypeDescriptors.contains(typeInFileMetaData)) {
+ StringBuilder message = new StringBuilder();
+ message
+ .append("FilterPredicate column: ")
+ .append(columnPath.toDotString())
+ .append("'s declared type (")
+ .append(foundColumnType.getName())
+ .append(") does not match the schema found in file metadata. Column ")
+ .append(columnPath.toDotString())
+ .append(" is of type: ")
+ .append(typeInFileMetaData)
+ .append("\nValid types for this column are: ")
+ .append(parquetTypeToClass.get(typeInFileMetaData));
+ throw new IllegalArgumentException(message.toString());
+ }
+ }
+
+ private static final class FullTypeDescriptor {
+ private final PrimitiveTypeName primitiveType;
+ private final OriginalType originalType;
+
+ private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) {
+ this.primitiveType = primitiveType;
+ this.originalType = originalType;
+ }
+
+ public PrimitiveTypeName getPrimitiveType() {
+ return primitiveType;
+ }
+
+ public OriginalType getOriginalType() {
+ return originalType;
+ }
+
+ @Override
+ public String toString() {
+ return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ FullTypeDescriptor that = (FullTypeDescriptor) o;
+
+ if (originalType != that.originalType) return false;
+ if (primitiveType != that.primitiveType) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = primitiveType != null ? primitiveType.hashCode() : 0;
+ result = 31 * result + (originalType != null ? originalType.hashCode() : 0);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
new file mode 100644
index 0000000..a76b5ee
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * See {@link FilteringRecordMaterializer}
+ */
+public class FilteringGroupConverter extends GroupConverter {
+ // the real converter
+ private final GroupConverter delegate;
+
+ // the path, from the root of the schema, to this converter
+ // used ultimately by the primitive converter proxy to figure
+ // out which column it represents.
+ private final List<Integer> indexFieldPath;
+
+ // for a given column, which nodes in the filter expression need
+ // to be notified of this column's value
+ private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn;
+
+ // used to go from our indexFieldPath to the PrimitiveColumnIO for that column
+ private final Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath;
+
+ public FilteringGroupConverter(
+ GroupConverter delegate,
+ List<Integer> indexFieldPath,
+ Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn, Map<List<Integer>,
+ PrimitiveColumnIO> columnIOsByIndexFieldPath) {
+
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.indexFieldPath = checkNotNull(indexFieldPath, "indexFieldPath");
+ this.columnIOsByIndexFieldPath = checkNotNull(columnIOsByIndexFieldPath, "columnIOsByIndexFieldPath");
+ this.valueInspectorsByColumn = checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+ }
+
+ // When a converter is asked for, we get the real one from the delegate, then wrap it
+ // in a filtering pass-through proxy.
+ // TODO: making the assumption that getConverter(i) is only called once, is that valid?
+ @Override
+ public Converter getConverter(int fieldIndex) {
+
+ // get the real converter from the delegate
+ Converter delegateConverter = checkNotNull(delegate.getConverter(fieldIndex), "delegate converter");
+
+ // determine the indexFieldPath for the converter proxy we're about to make, which is
+ // this converter's path + the requested fieldIndex
+ List<Integer> newIndexFieldPath = new ArrayList<Integer>(indexFieldPath.size() + 1);
+ newIndexFieldPath.addAll(indexFieldPath);
+ newIndexFieldPath.add(fieldIndex);
+
+ if (delegateConverter.isPrimitive()) {
+ PrimitiveColumnIO columnIO = getColumnIO(newIndexFieldPath);
+ ColumnPath columnPath = ColumnPath.get(columnIO.getColumnDescriptor().getPath());
+ ValueInspector[] valueInspectors = getValueInspectors(columnPath);
+ return new FilteringPrimitiveConverter(delegateConverter.asPrimitiveConverter(), valueInspectors);
+ } else {
+ return new FilteringGroupConverter(delegateConverter.asGroupConverter(), newIndexFieldPath, valueInspectorsByColumn, columnIOsByIndexFieldPath);
+ }
+
+ }
+
+ private PrimitiveColumnIO getColumnIO(List<Integer> indexFieldPath) {
+ PrimitiveColumnIO found = columnIOsByIndexFieldPath.get(indexFieldPath);
+ checkArgument(found != null, "Did not find PrimitiveColumnIO for index field path" + indexFieldPath);
+ return found;
+ }
+
+ private ValueInspector[] getValueInspectors(ColumnPath columnPath) {
+ List<ValueInspector> inspectorsList = valueInspectorsByColumn.get(columnPath);
+ if (inspectorsList == null) {
+ return new ValueInspector[] {};
+ } else {
+ return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]);
+ }
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ }
+
+ @Override
+ public void end() {
+ delegate.end();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
new file mode 100644
index 0000000..18edb64
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * see {@link FilteringRecordMaterializer}
+ *
+ * This pass-through proxy for a delegate {@link PrimitiveConverter} also
+ * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate}
+ */
+public class FilteringPrimitiveConverter extends PrimitiveConverter {
+ private final PrimitiveConverter delegate;
+ private final ValueInspector[] valueInspectors;
+
+ public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors");
+ }
+
+ // TODO: this works, but
+ // TODO: essentially turns off the benefits of dictionary support
+ // TODO: even if the underlying delegate supports it.
+ // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36)
+ @Override
+ public boolean hasDictionarySupport() {
+ return false;
+ }
+
+ @Override
+ public void setDictionary(Dictionary dictionary) {
+ throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+ }
+
+ @Override
+ public void addValueFromDictionary(int dictionaryId) {
+ throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addBinary(value);
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addBoolean(value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addDouble(value);
+ }
+
+ @Override
+ public void addFloat(float value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addFloat(value);
+ }
+
+ @Override
+ public void addInt(int value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addInt(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addLong(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
new file mode 100644
index 0000000..d8fa677
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate}
+ * as it receives concrete values for the current record. If, after the record assembly signals that
+ * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()}
+ * returns null to signal that this record is being skipped.
+ * Otherwise, the record is retrieved from the delegate.
+ */
+public class FilteringRecordMaterializer<T> extends RecordMaterializer<T> {
+ // the real record materializer
+ private final RecordMaterializer<T> delegate;
+
+ // the proxied root converter
+ private final FilteringGroupConverter rootConverter;
+
+ // the predicate
+ private final IncrementallyUpdatedFilterPredicate filterPredicate;
+
+ public FilteringRecordMaterializer(
+ RecordMaterializer<T> delegate,
+ List<PrimitiveColumnIO> columnIOs,
+ Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn,
+ IncrementallyUpdatedFilterPredicate filterPredicate) {
+
+ checkNotNull(columnIOs, "columnIOs");
+ checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+ this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+ this.delegate = checkNotNull(delegate, "delegate");
+
+ // keep track of which path of indices leads to which primitive column
+ Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
+
+ for (PrimitiveColumnIO c : columnIOs) {
+ columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
+ }
+
+ // create a proxy for the delegate's root converter
+ this.rootConverter = new FilteringGroupConverter(
+ delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
+ }
+
+ public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
+ return intArrayToList(c.getIndexFieldPath());
+ }
+
+ public static List<Integer> intArrayToList(int[] arr) {
+ List<Integer> list = new ArrayList<Integer>(arr.length);
+ for (int i : arr) {
+ list.add(i);
+ }
+ return list;
+ }
+
+
+
+ @Override
+ public T getCurrentRecord() {
+
+ // find out if the predicate thinks we should keep this record
+ boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate);
+
+ // reset the stateful predicate no matter what
+ IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
+
+ if (keep) {
+ return delegate.getCurrentRecord();
+ } else {
+ // signals a skip
+ return null;
+ }
+ }
+
+ @Override
+ public void skipCurrentRecord() {
+ delegate.skipCurrentRecord();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return rootConverter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
new file mode 100644
index 0000000..606c78f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.io.api.Binary;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A rewritten version of a {@link org.apache.parquet.filter2.predicate.FilterPredicate} which receives
+ * the values for a record's columns one by one and internally tracks whether the predicate is
+ * satisfied, unsatisfied, or unknown.
+ *
+ * This is used to apply a predicate during record assembly, without assembling a second copy of
+ * a record, and without building a stack of update events.
+ *
+ * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is
+ * {@link org.apache.parquet.filter2.predicate.FilterPredicate}
+ */
+public interface IncrementallyUpdatedFilterPredicate {
+
+ /**
+ * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern.
+ */
+ public static interface Visitor {
+ boolean visit(ValueInspector p);
+ boolean visit(And and);
+ boolean visit(Or or);
+ }
+
+ /**
+ * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern.
+ */
+ boolean accept(Visitor visitor);
+
+ /**
+ * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents,
+ * and decides whether or not the predicate represented by this node is satisfied.
+ *
+ * It is stateful, and needs to be rest after use.
+ */
+ public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate {
+ // package private constructor
+ ValueInspector() { }
+
+ private boolean result = false;
+ private boolean isKnown = false;
+
+ // these methods signal what the value is
+ public void updateNull() { throw new UnsupportedOperationException(); }
+ public void update(int value) { throw new UnsupportedOperationException(); }
+ public void update(long value) { throw new UnsupportedOperationException(); }
+ public void update(double value) { throw new UnsupportedOperationException(); }
+ public void update(float value) { throw new UnsupportedOperationException(); }
+ public void update(boolean value) { throw new UnsupportedOperationException(); }
+ public void update(Binary value) { throw new UnsupportedOperationException(); }
+
+ /**
+ * Reset to clear state and begin evaluating the next record.
+ */
+ public final void reset() {
+ isKnown = false;
+ result = false;
+ }
+
+ /**
+ * Subclasses should call this method to signal that the result of this predicate is known.
+ */
+ protected final void setResult(boolean result) {
+ if (isKnown) {
+ throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!"
+ + " Did you forget to call reset()?");
+ }
+ this.result = result;
+ this.isKnown = true;
+ }
+
+ /**
+ * Should only be called if {@link #isKnown} return true.
+ */
+ public final boolean getResult() {
+ if (!isKnown) {
+ throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!");
+ }
+ return result;
+ }
+
+ /**
+ * Return true if this inspector has received a value yet, false otherwise.
+ */
+ public final boolean isKnown() {
+ return isKnown;
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+
+ // base class for and / or
+ static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
+ private final IncrementallyUpdatedFilterPredicate left;
+ private final IncrementallyUpdatedFilterPredicate right;
+
+ BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ this.left = checkNotNull(left, "left");
+ this.right = checkNotNull(right, "right");
+ }
+
+ public final IncrementallyUpdatedFilterPredicate getLeft() {
+ return left;
+ }
+
+ public final IncrementallyUpdatedFilterPredicate getRight() {
+ return right;
+ }
+ }
+
+ public static final class Or extends BinaryLogical {
+ Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+
+ public static final class And extends BinaryLogical {
+ And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
new file mode 100644
index 0000000..8def88e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * The implementation of this abstract class is auto-generated by
+ * {@link org.apache.parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}
+ *
+ * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link org.apache.parquet.filter2.predicate.FilterPredicate}
+ * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link org.apache.parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * TODO: UserDefinedPredicates still autobox however
+ */
+public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
+ private boolean built = false;
+ private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
+
+ public IncrementallyUpdatedFilterPredicateBuilderBase() { }
+
+ public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
+ checkArgument(!built, "This builder has already been used");
+ IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
+ built = true;
+ return incremental;
+ }
+
+ protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) {
+ List<ValueInspector> valueInspectors = valueInspectorsByColumn.get(columnPath);
+ if (valueInspectors == null) {
+ valueInspectors = new ArrayList<ValueInspector>();
+ valueInspectorsByColumn.put(columnPath, valueInspectors);
+ }
+ valueInspectors.add(valueInspector);
+ }
+
+ public Map<ColumnPath, List<ValueInspector>> getValueInspectorsByColumn() {
+ return valueInspectorsByColumn;
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(And and) {
+ return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(Or or) {
+ return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this));
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(Not not) {
+ throw new IllegalArgumentException(
+ "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..d1aa66c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not.
+ * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state
+ * represent columns with a null value, and updates them accordingly.
+ *
+ * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome
+ * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37)
+ */
+public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor {
+ private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator();
+
+ public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) {
+ checkNotNull(pred, "pred");
+ return pred.accept(INSTANCE);
+ }
+
+ private IncrementallyUpdatedFilterPredicateEvaluator() {}
+
+ @Override
+ public boolean visit(ValueInspector p) {
+ if (!p.isKnown()) {
+ p.updateNull();
+ }
+ return p.getResult();
+ }
+
+ @Override
+ public boolean visit(And and) {
+ return and.getLeft().accept(this) && and.getRight().accept(this);
+ }
+
+ @Override
+ public boolean visit(Or or) {
+ return or.getLeft().accept(this) || or.getRight().accept(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..a75731a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}.
+ */
+public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor {
+ private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter();
+
+ public static void reset(IncrementallyUpdatedFilterPredicate pred) {
+ checkNotNull(pred, "pred");
+ pred.accept(INSTANCE);
+ }
+
+ private IncrementallyUpdatedFilterPredicateResetter() { }
+
+ @Override
+ public boolean visit(ValueInspector p) {
+ p.reset();
+ return false;
+ }
+
+ @Override
+ public boolean visit(And and) {
+ and.getLeft().accept(this);
+ and.getRight().accept(this);
+ return false;
+ }
+
+ @Override
+ public boolean visit(Or or) {
+ or.getLeft().accept(this);
+ or.getRight().accept(this);
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
new file mode 100644
index 0000000..f2d88fc
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.apache.parquet.Log.DEBUG;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.io.RecordReaderImplementation.State;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+// TODO(julien): this class appears to be unused -- can it be nuked? - todd
+public abstract class BaseRecordReader<T> extends RecordReader<T> {
+ private static final Log LOG = Log.getLog(BaseRecordReader.class);
+
+ public RecordConsumer recordConsumer;
+ public RecordMaterializer<T> recordMaterializer;
+ public ColumnReadStore columnStore;
+ @Override
+ public T read() {
+ readOneRecord();
+ return recordMaterializer.getCurrentRecord();
+ }
+
+ protected abstract void readOneRecord();
+
+ State[] caseLookup;
+
+ private String endField;
+
+ private int endIndex;
+
+ protected void currentLevel(int currentLevel) {
+ if (DEBUG) LOG.debug("currentLevel: "+currentLevel);
+ }
+
+ protected void log(String message) {
+ if (DEBUG) LOG.debug("bc: "+message);
+ }
+
+ final protected int getCaseId(int state, int currentLevel, int d, int nextR) {
+ return caseLookup[state].getCase(currentLevel, d, nextR).getID();
+ }
+
+ final protected void startMessage() {
+ // reset state
+ endField = null;
+ if (DEBUG) LOG.debug("startMessage()");
+ recordConsumer.startMessage();
+ }
+
+ final protected void startGroup(String field, int index) {
+ startField(field, index);
+ if (DEBUG) LOG.debug("startGroup()");
+ recordConsumer.startGroup();
+ }
+
+ private void startField(String field, int index) {
+ if (DEBUG) LOG.debug("startField("+field+","+index+")");
+ if (endField != null && index == endIndex) {
+ // skip the close/open tag
+ endField = null;
+ } else {
+ if (endField != null) {
+ // close the previous field
+ recordConsumer.endField(endField, endIndex);
+ endField = null;
+ }
+ recordConsumer.startField(field, index);
+ }
+ }
+
+ final protected void addPrimitiveINT64(String field, int index, long value) {
+ startField(field, index);
+ if (DEBUG) LOG.debug("addLong("+value+")");
+ recordConsumer.addLong(value);
+ endField(field, index);
+ }
+
+ private void endField(String field, int index) {
+ if (DEBUG) LOG.debug("endField("+field+","+index+")");
+ if (endField != null) {
+ recordConsumer.endField(endField, endIndex);
+ }
+ endField = field;
+ endIndex = index;
+ }
+
+ final protected void addPrimitiveBINARY(String field, int index, Binary value) {
+ startField(field, index);
+ if (DEBUG) LOG.debug("addBinary("+value+")");
+ recordConsumer.addBinary(value);
+ endField(field, index);
+ }
+
+ final protected void addPrimitiveINT32(String field, int index, int value) {
+ startField(field, index);
+ if (DEBUG) LOG.debug("addInteger("+value+")");
+ recordConsumer.addInteger(value);
+ endField(field, index);
+ }
+
+ final protected void endGroup(String field, int index) {
+ if (endField != null) {
+ // close the previous field
+ recordConsumer.endField(endField, endIndex);
+ endField = null;
+ }
+ if (DEBUG) LOG.debug("endGroup()");
+ recordConsumer.endGroup();
+ endField(field, index);
+ }
+
+ final protected void endMessage() {
+ if (endField != null) {
+ // close the previous field
+ recordConsumer.endField(endField, endIndex);
+ endField = null;
+ }
+ if (DEBUG) LOG.debug("endMessage()");
+ recordConsumer.endMessage();
+ }
+
+ protected void error(String message) {
+ throw new ParquetDecodingException(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
new file mode 100644
index 0000000..95a969e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+/**
+ * a structure used to serialize deserialize records
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class ColumnIO {
+
+ static final boolean DEBUG = Log.DEBUG;
+
+ private final GroupColumnIO parent;
+ private final Type type;
+ private final String name;
+ private final int index;
+ private int repetitionLevel;
+ private int definitionLevel;
+ private String[] fieldPath;
+ private int[] indexFieldPath;
+
+
+ ColumnIO(Type type, GroupColumnIO parent, int index) {
+ this.type = type;
+ this.parent = parent;
+ this.index = index;
+ this.name = type.getName();
+ }
+
+ String[] getFieldPath() {
+ return fieldPath;
+ }
+
+ public String getFieldPath(int level) {
+ return fieldPath[level];
+ }
+
+ public int[] getIndexFieldPath() {
+ return indexFieldPath;
+ }
+
+ public int getIndexFieldPath(int level) {
+ return indexFieldPath[level];
+ }
+
+ public int getIndex() {
+ return this.index;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ int getRepetitionLevel() {
+ return repetitionLevel;
+ }
+
+ int getDefinitionLevel() {
+ return definitionLevel;
+ }
+
+ void setRepetitionLevel(int repetitionLevel) {
+ this.repetitionLevel = repetitionLevel;
+ }
+
+ void setDefinitionLevel(int definitionLevel) {
+ this.definitionLevel = definitionLevel;
+ }
+
+ void setFieldPath(String[] fieldPath, int[] indexFieldPath) {
+ this.fieldPath = fieldPath;
+ this.indexFieldPath = indexFieldPath;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+ setRepetitionLevel(r);
+ setDefinitionLevel(d);
+ setFieldPath(fieldPath, indexFieldPath);
+ }
+
+ abstract List<String[]> getColumnNames();
+
+ public GroupColumnIO getParent() {
+ return parent;
+ }
+
+ abstract PrimitiveColumnIO getLast();
+ abstract PrimitiveColumnIO getFirst();
+
+ ColumnIO getParent(int r) {
+ if (getRepetitionLevel() == r && getType().isRepetition(Repetition.REPEATED)) {
+ return this;
+ } else if (getParent()!=null && getParent().getDefinitionLevel()>=r) {
+ return getParent().getParent(r);
+ } else {
+ throw new InvalidRecordException("no parent("+r+") for "+Arrays.toString(this.getFieldPath()));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName()+" "+type.getName()
+ +" r:"+repetitionLevel
+ +" d:"+definitionLevel
+ +" "+Arrays.toString(fieldPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
new file mode 100644
index 0000000..71af780
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.TypeVisitor;
+
+/**
+ * Factory constructing the ColumnIO structure from the schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnIOFactory {
+
+ public class ColumnIOCreatorVisitor implements TypeVisitor {
+
+ private MessageColumnIO columnIO;
+ private GroupColumnIO current;
+ private List<PrimitiveColumnIO> leaves = new ArrayList<PrimitiveColumnIO>();
+ private final boolean validating;
+ private final MessageType requestedSchema;
+ private int currentRequestedIndex;
+ private Type currentRequestedType;
+ private boolean strictTypeChecking;
+
+ public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
+ this(validating, requestedSchema, true);
+ }
+
+ public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
+ this.validating = validating;
+ this.requestedSchema = requestedSchema;
+ this.strictTypeChecking = strictTypeChecking;
+ }
+
+ @Override
+ public void visit(MessageType messageType) {
+ columnIO = new MessageColumnIO(requestedSchema, validating);
+ visitChildren(columnIO, messageType, requestedSchema);
+ columnIO.setLevels();
+ columnIO.setLeaves(leaves);
+ }
+
+ @Override
+ public void visit(GroupType groupType) {
+ if (currentRequestedType.isPrimitive()) {
+ incompatibleSchema(groupType, currentRequestedType);
+ }
+ GroupColumnIO newIO = new GroupColumnIO(groupType, current, currentRequestedIndex);
+ current.add(newIO);
+ visitChildren(newIO, groupType, currentRequestedType.asGroupType());
+ }
+
+ private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType requestedGroupType) {
+ GroupColumnIO oldIO = current;
+ current = newIO;
+ for (Type type : groupType.getFields()) {
+ // if the file schema does not contain the field it will just stay null
+ if (requestedGroupType.containsField(type.getName())) {
+ currentRequestedIndex = requestedGroupType.getFieldIndex(type.getName());
+ currentRequestedType = requestedGroupType.getType(currentRequestedIndex);
+ if (currentRequestedType.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
+ incompatibleSchema(type, currentRequestedType);
+ }
+ type.accept(this);
+ }
+ }
+ current = oldIO;
+ }
+
+ @Override
+ public void visit(PrimitiveType primitiveType) {
+ if (!currentRequestedType.isPrimitive() ||
+ (this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
+ incompatibleSchema(primitiveType, currentRequestedType);
+ }
+ PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
+ current.add(newIO);
+ leaves.add(newIO);
+ }
+
+ private void incompatibleSchema(Type fileType, Type requestedType) {
+ throw new ParquetDecodingException("The requested schema is not compatible with the file schema. incompatible types: " + requestedType + " != " + fileType);
+ }
+
+ public MessageColumnIO getColumnIO() {
+ return columnIO;
+ }
+
+ }
+
+ private final boolean validating;
+
+ /**
+ * validation is off by default
+ */
+ public ColumnIOFactory() {
+ this(false);
+ }
+
+ /**
+ * @param validating to turn validation on
+ */
+ public ColumnIOFactory(boolean validating) {
+ super();
+ this.validating = validating;
+ }
+
+ /**
+ * @param schema the requestedSchema we want to read/write
+ * @param fileSchema the file schema (when reading it can be different from the requested schema)
+ * @return the corresponding serializing/deserializing structure
+ */
+ public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
+ return getColumnIO(requestedSchema, fileSchema, true);
+ }
+
+ /**
+ * @param schema the requestedSchema we want to read/write
+ * @param fileSchema the file schema (when reading it can be different from the requested schema)
+ * @param strict should file type and requested primitive types match
+ * @return the corresponding serializing/deserializing structure
+ */
+ public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
+ ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
+ fileSchema.accept(visitor);
+ return visitor.getColumnIO();
+ }
+
+ /**
+ * @param schema the schema we want to read/write
+ * @return the corresponding serializing/deserializing structure
+ */
+ public MessageColumnIO getColumnIO(MessageType schema) {
+ return this.getColumnIO(schema, schema);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
new file mode 100644
index 0000000..e15ab2e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when a problem occured while compiling the column reader
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class CompilationException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public CompilationException() {
+ }
+
+ public CompilationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CompilationException(String message) {
+ super(message);
+ }
+
+ public CompilationException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
new file mode 100644
index 0000000..671c651
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+/**
+ * used to read empty schema
+ *
+ * @author Mickael Lacour <m....@criteo.com>
+ *
+ * @param <T> the type of the materialized record
+ */
+class EmptyRecordReader<T> extends RecordReader<T> {
+
+ private final GroupConverter recordConsumer;
+ private final RecordMaterializer<T> recordMaterializer;
+
+ public EmptyRecordReader(RecordMaterializer<T> recordMaterializer) {
+ this.recordMaterializer = recordMaterializer;
+ this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
+ }
+
+ /**
+ * @see org.apache.parquet.io.RecordReader#read()
+ */
+ @Override
+ public T read() {
+ recordConsumer.start();
+ recordConsumer.end();
+ return recordMaterializer.getCurrentRecord();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
new file mode 100644
index 0000000..3444b1f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+/**
+ * Extends the
+ * @author Jacob Metcalf
+ *
+ */
+class FilteredRecordReader<T> extends RecordReaderImplementation<T> {
+
+ private final RecordFilter recordFilter;
+ private final long recordCount;
+ private long recordsRead = 0;
+
+ /**
+ * @param root the root of the schema
+ * @param validating
+ * @param columnStore
+ * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered.
+ */
+ public FilteredRecordReader(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating,
+ ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter, long recordCount) {
+ super(root, recordMaterializer, validating, columnStore);
+ this.recordCount = recordCount;
+ if ( unboundFilter != null ) {
+ recordFilter = unboundFilter.bind(getColumnReaders());
+ } else {
+ recordFilter = null;
+ }
+ }
+
+ /**
+ * Override read() method to provide skip.
+ */
+ @Override
+ public T read() {
+ skipToMatch();
+ if (recordsRead == recordCount) {
+ return null;
+ }
+ ++ recordsRead;
+ return super.read();
+ }
+
+ // FilteredRecordReader skips forwards itself, it never asks the layer above to do the skipping for it.
+ // This is different from how filtering is handled in the filter2 API
+ @Override
+ public boolean shouldSkipCurrentRecord() {
+ return false;
+ }
+
+ /**
+ * Skips forwards until the filter finds the first match. Returns false
+ * if none found.
+ */
+ private void skipToMatch() {
+ while (recordsRead < recordCount && !recordFilter.isMatch()) {
+ State currentState = getState(0);
+ do {
+ ColumnReader columnReader = currentState.column;
+
+ // currentLevel = depth + 1 at this point
+ // set the current value
+ if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) {
+ columnReader.skip();
+ }
+ columnReader.consume();
+
+ // Based on repetition level work out next state to go to
+ int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
+ currentState = currentState.getNextState(nextR);
+ } while (currentState != null);
+ ++ recordsRead;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
new file mode 100644
index 0000000..1efe0d1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.schema.GroupType;
+
+/**
+ * Group level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GroupColumnIO extends ColumnIO {
+ private static final Log LOG = Log.getLog(GroupColumnIO.class);
+
+ private final Map<String, ColumnIO> childrenByName = new HashMap<String, ColumnIO>();
+ private final List<ColumnIO> children = new ArrayList<ColumnIO>();
+ private int childrenSize = 0;
+
+ GroupColumnIO(GroupType groupType, GroupColumnIO parent, int index) {
+ super(groupType, parent, index);
+ }
+
+ void add(ColumnIO child) {
+ children.add(child);
+ childrenByName.put(child.getType().getName(), child);
+ ++ childrenSize;
+ }
+
+ @Override
+ void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+ super.setLevels(r, d, fieldPath, indexFieldPath, repetition, path);
+ for (ColumnIO child : this.children) {
+ String[] newFieldPath = Arrays.copyOf(fieldPath, fieldPath.length + 1);
+ int[] newIndexFieldPath = Arrays.copyOf(indexFieldPath, indexFieldPath.length + 1);
+ newFieldPath[fieldPath.length] = child.getType().getName();
+ newIndexFieldPath[indexFieldPath.length] = child.getIndex();
+ List<ColumnIO> newRepetition;
+ if (child.getType().isRepetition(REPEATED)) {
+ newRepetition = new ArrayList<ColumnIO>(repetition);
+ newRepetition.add(child);
+ } else {
+ newRepetition = repetition;
+ }
+ List<ColumnIO> newPath = new ArrayList<ColumnIO>(path);
+ newPath.add(child);
+ child.setLevels(
+ // the type repetition level increases whenever there's a possible repetition
+ child.getType().isRepetition(REPEATED) ? r + 1 : r,
+ // the type definition level increases whenever a field can be missing (not required)
+ !child.getType().isRepetition(REQUIRED) ? d + 1 : d,
+ newFieldPath,
+ newIndexFieldPath,
+ newRepetition,
+ newPath
+ );
+
+ }
+ }
+
+ @Override
+ List<String[]> getColumnNames() {
+ ArrayList<String[]> result = new ArrayList<String[]>();
+ for (ColumnIO c : children) {
+ result.addAll(c.getColumnNames());
+ }
+ return result;
+ }
+
+ PrimitiveColumnIO getLast() {
+ return children.get(children.size()-1).getLast();
+ }
+
+ PrimitiveColumnIO getFirst() {
+ return children.get(0).getFirst();
+ }
+
+ public ColumnIO getChild(String name) {
+ return childrenByName.get(name);
+ }
+
+ public ColumnIO getChild(int fieldIndex) {
+ try {
+ return children.get(fieldIndex);
+ } catch (IndexOutOfBoundsException e) {
+ throw new InvalidRecordException("could not get child " + fieldIndex + " from " + children, e);
+ }
+ }
+
+ public int getChildrenCount() {
+ return childrenSize;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
new file mode 100644
index 0000000..d3d0111
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when an invalid record is encountered
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class InvalidRecordException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidRecordException() {
+ super();
+ }
+
+ public InvalidRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidRecordException(String message) {
+ super(message);
+ }
+
+ public InvalidRecordException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
new file mode 100644
index 0000000..e24aedb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.FilteringRecordMaterializer;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateBuilder;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Message level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class MessageColumnIO extends GroupColumnIO {
+ private static final Log logger = Log.getLog(MessageColumnIO.class);
+
+ private static final boolean DEBUG = Log.DEBUG;
+
+ private List<PrimitiveColumnIO> leaves;
+
+ private final boolean validating;
+
+ MessageColumnIO(MessageType messageType, boolean validating) {
+ super(messageType, null, 0);
+ this.validating = validating;
+ }
+
+ public List<String[]> getColumnNames() {
+ return super.getColumnNames();
+ }
+
+ public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+ RecordMaterializer<T> recordMaterializer) {
+ return getRecordReader(columns, recordMaterializer, FilterCompat.NOOP);
+ }
+
+ /**
+ * @deprecated use {@link #getRecordReader(PageReadStore, RecordMaterializer, Filter)}
+ */
+ @Deprecated
+ public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+ RecordMaterializer<T> recordMaterializer,
+ UnboundRecordFilter filter) {
+ return getRecordReader(columns, recordMaterializer, FilterCompat.get(filter));
+ }
+
+ public <T> RecordReader<T> getRecordReader(final PageReadStore columns,
+ final RecordMaterializer<T> recordMaterializer,
+ final Filter filter) {
+ checkNotNull(columns, "columns");
+ checkNotNull(recordMaterializer, "recordMaterializer");
+ checkNotNull(filter, "filter");
+
+ if (leaves.isEmpty()) {
+ return new EmptyRecordReader<T>(recordMaterializer);
+ }
+
+ return filter.accept(new Visitor<RecordReader<T>>() {
+ @Override
+ public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) {
+
+ FilterPredicate predicate = filterPredicateCompat.getFilterPredicate();
+ IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder();
+ IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate);
+ RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>(
+ recordMaterializer,
+ leaves,
+ builder.getValueInspectorsByColumn(),
+ streamingPredicate);
+
+ return new RecordReaderImplementation<T>(
+ MessageColumnIO.this,
+ filteringRecordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType()));
+ }
+
+ @Override
+ public RecordReader<T> visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+ return new FilteredRecordReader<T>(
+ MessageColumnIO.this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+ unboundRecordFilterCompat.getUnboundRecordFilter(),
+ columns.getRowCount()
+ );
+
+ }
+
+ @Override
+ public RecordReader<T> visit(NoOpFilter noOpFilter) {
+ return new RecordReaderImplementation<T>(
+ MessageColumnIO.this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+ }
+ });
+ }
+
+ private class MessageColumnIORecordConsumer extends RecordConsumer {
+ private ColumnIO currentColumnIO;
+ private int currentLevel = 0;
+
+ private class FieldsMarker {
+ private BitSet vistedIndexes = new BitSet();
+
+ @Override
+ public String toString() {
+ return "VistedIndex{" +
+ "vistedIndexes=" + vistedIndexes +
+ '}';
+ }
+
+ public void reset(int fieldsCount) {
+ this.vistedIndexes.clear(0, fieldsCount);
+ }
+
+ public void markWritten(int i) {
+ vistedIndexes.set(i);
+ }
+
+ public boolean isWritten(int i) {
+ return vistedIndexes.get(i);
+ }
+ }
+
+ //track at each level of depth, which fields are written, so nulls can be inserted for the unwritten fields
+ private final FieldsMarker[] fieldsWritten;
+ private final int[] r;
+ private final ColumnWriter[] columnWriter;
+ private final ColumnWriteStore columns;
+ private boolean emptyField = true;
+
+ public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
+ this.columns = columns;
+ int maxDepth = 0;
+ this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];
+ for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
+ maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length);
+ columnWriter[primitiveColumnIO.getId()] = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
+ }
+
+ fieldsWritten = new FieldsMarker[maxDepth];
+ for (int i = 0; i < maxDepth; i++) {
+ fieldsWritten[i] = new FieldsMarker();
+ }
+ r = new int[maxDepth];
+ }
+
+ public void printState() {
+ log(currentLevel + ", " + fieldsWritten[currentLevel] + ": " + Arrays.toString(currentColumnIO.getFieldPath()) + " r:" + r[currentLevel]);
+ if (r[currentLevel] > currentColumnIO.getRepetitionLevel()) {
+ // sanity check
+ throw new InvalidRecordException(r[currentLevel] + "(r) > " + currentColumnIO.getRepetitionLevel() + " ( schema r)");
+ }
+ }
+
+ private void log(Object m) {
+ String indent = "";
+ for (int i = 0; i<currentLevel; ++i) {
+ indent += " ";
+ }
+ logger.debug(indent + m);
+ }
+
+ @Override
+ public void startMessage() {
+ if (DEBUG) log("< MESSAGE START >");
+ currentColumnIO = MessageColumnIO.this;
+ r[0] = 0;
+ int numberOfFieldsToVisit = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+ fieldsWritten[0].reset(numberOfFieldsToVisit);
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void endMessage() {
+ writeNullForMissingFieldsAtCurrentLevel();
+ columns.endRecord();
+ if (DEBUG) log("< MESSAGE END >");
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void startField(String field, int index) {
+ try {
+ if (DEBUG) log("startField(" + field + ", " + index + ")");
+ currentColumnIO = ((GroupColumnIO)currentColumnIO).getChild(index);
+ emptyField = true;
+ if (DEBUG) printState();
+ } catch (RuntimeException e) {
+ throw new ParquetEncodingException("error starting field " + field + " at " + index, e);
+ }
+ }
+
+ @Override
+ public void endField(String field, int index) {
+ if (DEBUG) log("endField(" + field + ", " + index + ")");
+ currentColumnIO = currentColumnIO.getParent();
+ if (emptyField) {
+ throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
+ }
+ fieldsWritten[currentLevel].markWritten(index);
+ r[currentLevel] = currentLevel == 0 ? 0 : r[currentLevel - 1];
+ if (DEBUG) printState();
+ }
+
+ private void writeNullForMissingFieldsAtCurrentLevel() {
+ int currentFieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+ for (int i = 0; i < currentFieldsCount; i++) {
+ if (!fieldsWritten[currentLevel].isWritten(i)) {
+ try {
+ ColumnIO undefinedField = ((GroupColumnIO)currentColumnIO).getChild(i);
+ int d = currentColumnIO.getDefinitionLevel();
+ if (DEBUG)
+ log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + r[currentLevel] + "," + d + ")");
+ writeNull(undefinedField, r[currentLevel], d);
+ } catch (RuntimeException e) {
+ throw new ParquetEncodingException("error while writing nulls for fields of indexes " + i + " . current index: " + fieldsWritten[currentLevel], e);
+ }
+ }
+ }
+ }
+
+ private void writeNull(ColumnIO undefinedField, int r, int d) {
+ if (undefinedField.getType().isPrimitive()) {
+ columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d);
+ } else {
+ GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
+ int childrenCount = groupColumnIO.getChildrenCount();
+ for (int i = 0; i < childrenCount; i++) {
+ writeNull(groupColumnIO.getChild(i), r, d);
+ }
+ }
+ }
+
+ private void setRepetitionLevel() {
+ r[currentLevel] = currentColumnIO.getRepetitionLevel();
+ if (DEBUG) log("r: " + r[currentLevel]);
+ }
+
+ @Override
+ public void startGroup() {
+ if (DEBUG) log("startGroup()");
+
+ ++ currentLevel;
+ r[currentLevel] = r[currentLevel - 1];
+
+ int fieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+ fieldsWritten[currentLevel].reset(fieldsCount);
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void endGroup() {
+ if (DEBUG) log("endGroup()");
+ emptyField = false;
+ writeNullForMissingFieldsAtCurrentLevel();
+ -- currentLevel;
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ private ColumnWriter getColumnWriter() {
+ return columnWriter[((PrimitiveColumnIO)currentColumnIO).getId()];
+ }
+
+ @Override
+ public void addInteger(int value) {
+ if (DEBUG) log("addInt(" + value + ")");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void addLong(long value) {
+ if (DEBUG) log("addLong(" + value + ")");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ if (DEBUG) log("addBoolean(" + value + ")");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ if (DEBUG) log("addBinary(" + value.length() + " bytes)");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void addFloat(float value) {
+ if (DEBUG) log("addFloat(" + value + ")");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ @Override
+ public void addDouble(double value) {
+ if (DEBUG) log("addDouble(" + value + ")");
+ emptyField = false;
+ getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+ setRepetitionLevel();
+ if (DEBUG) printState();
+ }
+
+ }
+
+ public RecordConsumer getRecordWriter(ColumnWriteStore columns) {
+ RecordConsumer recordWriter = new MessageColumnIORecordConsumer(columns);
+ if (DEBUG) recordWriter = new RecordConsumerLoggingWrapper(recordWriter);
+ return validating ? new ValidatingRecordConsumer(recordWriter, getType()) : recordWriter;
+ }
+
+ void setLevels() {
+ setLevels(0, 0, new String[0], new int[0], Arrays.<ColumnIO>asList(this), Arrays.<ColumnIO>asList(this));
+ }
+
+ void setLeaves(List<PrimitiveColumnIO> leaves) {
+ this.leaves = leaves;
+ }
+
+ public List<PrimitiveColumnIO> getLeaves() {
+ return this.leaves;
+ }
+
+ @Override
+ public MessageType getType() {
+ return (MessageType)super.getType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
new file mode 100644
index 0000000..1007e32
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when an encoding problem occured
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetDecodingException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ParquetDecodingException() {
+ }
+
+ public ParquetDecodingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParquetDecodingException(String message) {
+ super(message);
+ }
+
+ public ParquetDecodingException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
new file mode 100644
index 0000000..05f9c56
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when a decoding problem occured
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetEncodingException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ParquetEncodingException() {
+ }
+
+ public ParquetEncodingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParquetEncodingException(String message) {
+ super(message);
+ }
+
+ public ParquetEncodingException(Throwable cause) {
+ super(cause);
+ }
+
+}