You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:39 UTC
[42/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
new file mode 100644
index 0000000..9d94439
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class DoubleStatistics extends Statistics<Double> {
+
+ private double max;
+ private double min;
+
+ @Override
+ public void updateStats(double value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ DoubleStatistics doubleStats = (DoubleStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(doubleStats.getMin(), doubleStats.getMax());
+ } else {
+ updateStats(doubleStats.getMin(), doubleStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = Double.longBitsToDouble(BytesUtils.bytesToLong(maxBytes));
+ min = Double.longBitsToDouble(BytesUtils.bytesToLong(minBytes));
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return BytesUtils.longToBytes(Double.doubleToLongBits(max));
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return BytesUtils.longToBytes(Double.doubleToLongBits(min));
+ }
+
+ @Override
+ public String toString() {
+ if(this.hasNonNullValue())
+ return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
+ else if (!this.isEmpty())
+ return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(double min_value, double max_value) {
+ if (min_value < min) { min = min_value; }
+ if (max_value > max) { max = max_value; }
+ }
+
+ public void initializeStats(double min_value, double max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Double genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Double genericGetMax() {
+ return max;
+ }
+
+ public double getMax() {
+ return max;
+ }
+
+ public double getMin() {
+ return min;
+ }
+
+ public void setMinMax(double min, double max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
new file mode 100644
index 0000000..c164cf5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class FloatStatistics extends Statistics<Float> {
+
+ private float max;
+ private float min;
+
+ @Override
+ public void updateStats(float value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ FloatStatistics floatStats = (FloatStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(floatStats.getMin(), floatStats.getMax());
+ } else {
+ updateStats(floatStats.getMin(), floatStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = Float.intBitsToFloat(BytesUtils.bytesToInt(maxBytes));
+ min = Float.intBitsToFloat(BytesUtils.bytesToInt(minBytes));
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return BytesUtils.intToBytes(Float.floatToIntBits(max));
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return BytesUtils.intToBytes(Float.floatToIntBits(min));
+ }
+
+ @Override
+ public String toString() {
+ if (this.hasNonNullValue())
+ return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls());
+ else if (!this.isEmpty())
+ return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(float min_value, float max_value) {
+ if (min_value < min) { min = min_value; }
+ if (max_value > max) { max = max_value; }
+ }
+
+ public void initializeStats(float min_value, float max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Float genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Float genericGetMax() {
+ return max;
+ }
+
+ public float getMax() {
+ return max;
+ }
+
+ public float getMin() {
+ return min;
+ }
+
+ public void setMinMax(float min, float max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
new file mode 100644
index 0000000..8deb28a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class IntStatistics extends Statistics<Integer> {
+
+ private int max;
+ private int min;
+
+ @Override
+ public void updateStats(int value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ IntStatistics intStats = (IntStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(intStats.getMin(), intStats.getMax());
+ } else {
+ updateStats(intStats.getMin(), intStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = BytesUtils.bytesToInt(maxBytes);
+ min = BytesUtils.bytesToInt(minBytes);
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return BytesUtils.intToBytes(max);
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return BytesUtils.intToBytes(min);
+ }
+
+ @Override
+ public String toString() {
+ if (this.hasNonNullValue())
+ return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
+ else if (!this.isEmpty())
+ return String.format("num_nulls: %d, min/max is not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(int min_value, int max_value) {
+ if (min_value < min) { min = min_value; }
+ if (max_value > max) { max = max_value; }
+ }
+
+ public void initializeStats(int min_value, int max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Integer genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Integer genericGetMax() {
+ return max;
+ }
+
+ public int getMax() {
+ return max;
+ }
+
+ public int getMin() {
+ return min;
+ }
+
+ public void setMinMax(int min, int max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
new file mode 100644
index 0000000..a8c177e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class LongStatistics extends Statistics<Long> {
+
+ private long max;
+ private long min;
+
+ @Override
+ public void updateStats(long value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ LongStatistics longStats = (LongStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(longStats.getMin(), longStats.getMax());
+ } else {
+ updateStats(longStats.getMin(), longStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = BytesUtils.bytesToLong(maxBytes);
+ min = BytesUtils.bytesToLong(minBytes);
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return BytesUtils.longToBytes(max);
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return BytesUtils.longToBytes(min);
+ }
+
+ @Override
+ public String toString() {
+ if (this.hasNonNullValue())
+ return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls());
+ else if (!this.isEmpty())
+ return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(long min_value, long max_value) {
+ if (min_value < min) { min = min_value; }
+ if (max_value > max) { max = max_value; }
+ }
+
+ public void initializeStats(long min_value, long max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Long genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Long genericGetMax() {
+ return max;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public void setMinMax(long min, long max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
new file mode 100644
index 0000000..ba135f5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.column.UnknownColumnTypeException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import java.util.Arrays;
+
+
+/**
+ * Statistics class to keep track of statistics in parquet pages and column chunks
+ *
+ * @author Katya Gonina
+ */
+public abstract class Statistics<T extends Comparable<T>> {
+
+ private boolean hasNonNullValue;
+ private long num_nulls;
+
+ public Statistics() {
+ hasNonNullValue = false;
+ num_nulls = 0;
+ }
+
+ /**
+ * Returns the typed statistics object based on the passed type parameter
+ * @param type PrimitiveTypeName type of the column
+ * @return instance of a typed statistics class
+ */
+ public static Statistics getStatsBasedOnType(PrimitiveTypeName type) {
+ switch(type) {
+ case INT32:
+ return new IntStatistics();
+ case INT64:
+ return new LongStatistics();
+ case FLOAT:
+ return new FloatStatistics();
+ case DOUBLE:
+ return new DoubleStatistics();
+ case BOOLEAN:
+ return new BooleanStatistics();
+ case BINARY:
+ return new BinaryStatistics();
+ case INT96:
+ return new BinaryStatistics();
+ case FIXED_LEN_BYTE_ARRAY:
+ return new BinaryStatistics();
+ default:
+ throw new UnknownColumnTypeException(type);
+ }
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(int value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(float value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * updates statistics min and max using the passed value
+ * @param value value to use to update min and max
+ */
+ public void updateStats(Binary value) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Equality comparison method to compare two statistics objects.
+ * @param stats Statistics object to compare against
+ * @return true if objects are equal, false otherwise
+ */
+ public boolean equals(Statistics stats) {
+ return Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) &&
+ Arrays.equals(stats.getMinBytes(), this.getMinBytes()) &&
+ stats.getNumNulls() == this.getNumNulls();
+ }
+
+ /**
+ * Hash code for the statistics object
+ * @return hash code int
+ */
+ public int hashCode() {
+ return 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes()) + Long.valueOf(this.getNumNulls()).hashCode();
+ }
+
+ /**
+ * Method to merge this statistics object with the object passed
+ * as parameter. Merging keeps the smallest of min values, largest of max
+ * values and combines the number of null counts.
+ * @param stats Statistics object to merge with
+ */
+ public void mergeStatistics(Statistics stats) {
+ if (stats.isEmpty()) return;
+
+ if (this.getClass() == stats.getClass()) {
+ incrementNumNulls(stats.getNumNulls());
+ if (stats.hasNonNullValue()) {
+ mergeStatisticsMinMax(stats);
+ markAsNotEmpty();
+ }
+ } else {
+ throw new StatisticsClassException(this.getClass().toString(), stats.getClass().toString());
+ }
+ }
+
+ /**
+ * Abstract method to merge this statistics min and max with the values
+ * of the parameter object. Does not do any checks, only called internally.
+ * @param stats Statistics object to merge with
+ */
+ abstract protected void mergeStatisticsMinMax(Statistics stats);
+
+ /**
+ * Abstract method to set min and max values from byte arrays.
+ * @param minBytes byte array to set the min value to
+ * @param maxBytes byte array to set the max value to
+ */
+ abstract public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
+
+ abstract public T genericGetMin();
+ abstract public T genericGetMax();
+
+ /**
+ * Abstract method to return the max value as a byte array
+ * @return byte array corresponding to the max value
+ */
+ abstract public byte[] getMaxBytes();
+
+ /**
+ * Abstract method to return the min value as a byte array
+ * @return byte array corresponding to the min value
+ */
+ abstract public byte[] getMinBytes();
+
+ /**
+ * toString() to display min, max, num_nulls in a string
+ */
+ abstract public String toString();
+
+
+ /**
+ * Increments the null count by one
+ */
+ public void incrementNumNulls() {
+ num_nulls++ ;
+ }
+
+ /**
+ * Increments the null count by the parameter value
+ * @param increment value to increment the null count by
+ */
+ public void incrementNumNulls(long increment) {
+ num_nulls += increment ;
+ }
+
+ /**
+ * Returns the null count
+ * @return null count
+ */
+ public long getNumNulls() {
+ return num_nulls;
+ }
+
+ /**
+ * Sets the number of nulls to the parameter value
+ * @param nulls null count to set the count to
+ */
+ public void setNumNulls(long nulls) {
+ num_nulls = nulls;
+ }
+
+ /**
+ * Returns a boolean specifying if the Statistics object is empty,
+ * i.e does not contain valid statistics for the page/column yet
+ * @return true if object is empty, false otherwise
+ */
+ public boolean isEmpty() {
+ return !hasNonNullValue && num_nulls == 0;
+ }
+
+ /**
+ * Returns whether there have been non-null values added to this statistics
+ */
+ public boolean hasNonNullValue() {
+ return hasNonNullValue;
+ }
+
+ /**
+ * Sets the page/column as having a valid non-null value
+ * kind of misnomer here
+ */
+ protected void markAsNotEmpty() {
+ hasNonNullValue = true;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
new file mode 100644
index 0000000..a242737
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Thrown if the two Statistics objects have mismatching types
+ *
+ * @author Katya Gonina
+ */
+public class StatisticsClassException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public StatisticsClassException(String className1, String className2) {
+ super("Statistics classes mismatched: " + className1 + " vs. " + className2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
new file mode 100644
index 0000000..f491233
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+
+/**
+ *
+ * Used to add extra behavior to a ValuesWriter that requires fallback
+ * @See {@link FallbackValuesWriter}
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface RequiresFallback {
+
+ /**
+ * In the case of a dictionary based encoding we will fallback if the dictionary becomes too big
+ * @return true to notify the parent that we should fallback to another encoding
+ */
+ boolean shouldFallBack();
+
+ /**
+ * Before writing the first page we will verify if the encoding is worth it.
+ * and fall back if a simpler encoding would be better in that case
+ * @param rawSize the size if encoded with plain
+ * @param encodedSize the size as encoded by the current encoding
+ * @return true if we keep this encoding
+ */
+ boolean isCompressionSatisfying(long rawSize, long encodedSize);
+
+ /**
+ * When falling back to a different encoding we must re-encode all the values seen so far
+ * @param writer the new encoder to write the current values to
+ */
+ void fallBackAllValuesTo(ValuesWriter writer);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
new file mode 100644
index 0000000..a3d8920
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import java.io.IOException;
+
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Base class to implement an encoding for a given column type.
+ *
+ * A ValuesReader is provided with a page (byte-array) and is responsible
+ * for deserializing the primitive values stored in that page.
+ *
+ * Given that pages are homogeneous (store only a single type), typical subclasses
+ * will only override one of the read*() methods.
+ *
+ * @author Julien Le Dem
+ */
+public abstract class ValuesReader {
+
+ /**
+ * Called to initialize the column reader from a part of a page.
+ *
+ * The underlying implementation knows how much data to read, so a length
+ * is not provided.
+ *
+ * Each page may contain several sections:
+ * <ul>
+ * <li> repetition levels column
+ * <li> definition levels column
+ * <li> data column
+ * </ul>
+ *
+ * This function is called with 'offset' pointing to the beginning of one of these sections,
+ * and should return the offset to the section following it.
+ *
+ * @param valueCount count of values in this page
+ * @param page the array to read from containing the page data (repetition levels, definition levels, data)
+ * @param offset where to start reading from in the page
+ *
+ * @throws IOException
+ */
+ public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
+
+ /**
+ * Called to return offset of the next section
+ * @return offset of the next section
+ */
+ public int getNextOffset() {
+ throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
+ }
+
+ /**
+ * usable when the encoding is dictionary based
+ * @return the id of the next value from the page
+ */
+ public int readValueDictionaryId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next boolean from the page
+ */
+ public boolean readBoolean() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next Binary from the page
+ */
+ public Binary readBytes() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next float from the page
+ */
+ public float readFloat() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next double from the page
+ */
+ public double readDouble() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next integer from the page
+ */
+ public int readInteger() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return the next long from the page
+ */
+ public long readLong() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Skips the next value in the page
+ */
+ abstract public void skip();
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
new file mode 100644
index 0000000..c8f31b9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * base class to implement an encoding for a given column
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class ValuesWriter {
+
+ /**
+ * used to decide if we want to work to the next page
+ * @return the size of the currently buffered data (in bytes)
+ */
+ public abstract long getBufferedSize();
+
+
+ // TODO: maybe consolidate into a getPage
+ /**
+ * @return the bytes buffered so far to write to the current page
+ */
+ public abstract BytesInput getBytes();
+
+ /**
+ * called after getBytes() and before reset()
+ * @return the encoding that was used to encode the bytes
+ */
+ public abstract Encoding getEncoding();
+
+ /**
+ * called after getBytes() to reset the current buffer and start writing the next page
+ */
+ public abstract void reset();
+
+ /**
+ * @return the dictionary page or null if not dictionary based
+ */
+ public DictionaryPage createDictionaryPage() {
+ return null;
+ }
+
+ /**
+ * reset the dictionary when a new block starts
+ */
+ public void resetDictionary() {
+ }
+
+ /**
+ * ( > {@link #getBufferedMemorySize} )
+ * @return the allocated size of the buffer
+ */
+ abstract public long getAllocatedSize();
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeByte(int value) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeBoolean(boolean v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeBytes(Binary v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeInteger(int v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeLong(long v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeDouble(double v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ /**
+ * @param value the value to encode
+ */
+ public void writeFloat(float v) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ abstract public String memUsageString(String prefix);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
new file mode 100644
index 0000000..f713263
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * a column reader that packs the ints in the number of bits required based on the maximum size.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BitPackingValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(BitPackingValuesReader.class);
+
+ private ByteArrayInputStream in;
+ private BitPackingReader bitPackingReader;
+ private final int bitsPerValue;
+ private int nextOffset;
+
+ /**
+ * @param bound the maximum value stored by this column
+ */
+ public BitPackingValuesReader(int bound) {
+ this.bitsPerValue = getWidthFromMaxInt(bound);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesReader#readInteger()
+ */
+ @Override
+ public int readInteger() {
+ try {
+ return bitPackingReader.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
+ */
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+ int effectiveBitLength = valueCount * bitsPerValue;
+ int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
+ if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
+ this.in = new ByteArrayInputStream(in, offset, length);
+ this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
+ this.nextOffset = offset + length;
+ }
+
+ @Override
+ public int getNextOffset() {
+ return nextOffset;
+ }
+
+ @Override
+ public void skip() {
+ readInteger();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
new file mode 100644
index 0000000..24436ef
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.values.bitpacking.BitPacking.getBitPackingWriter;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * a column writer that packs the ints in the number of bits required based on the maximum size.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BitPackingValuesWriter extends ValuesWriter {
+
+ private CapacityByteArrayOutputStream out;
+ private BitPackingWriter bitPackingWriter;
+ private int bitsPerValue;
+
+ /**
+ * @param bound the maximum value stored by this column
+ * @param pageSize
+ */
+ public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
+ this.bitsPerValue = getWidthFromMaxInt(bound);
+ this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+ init();
+ }
+
+ private void init() {
+ this.bitPackingWriter = getBitPackingWriter(bitsPerValue, out);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesWriter#writeInteger(int)
+ */
+ @Override
+ public void writeInteger(int v) {
+ try {
+ bitPackingWriter.write(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesWriter#getBufferedSize()
+ */
+ @Override
+ public long getBufferedSize() {
+ return out.size();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesWriter#getBytes()
+ */
+ @Override
+ public BytesInput getBytes() {
+ try {
+ this.bitPackingWriter.finish();
+ return BytesInput.from(out);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesWriter#reset()
+ */
+ @Override
+ public void reset() {
+ out.reset();
+ init();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.values.ValuesWriter#getAllocatedSize()
+ */
+ @Override
+ public long getAllocatedSize() {
+ return out.getCapacity();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return out.memUsageString(prefix);
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return BIT_PACKED;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
new file mode 100644
index 0000000..c0ab7e0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+
+public class ByteBitPackingValuesReader extends ValuesReader {
+ private static final int VALUES_AT_A_TIME = 8; // because we're using unpack8Values()
+
+ private static final Log LOG = Log.getLog(ByteBitPackingValuesReader.class);
+
+ private final int bitWidth;
+ private final BytePacker packer;
+ private final int[] decoded = new int[VALUES_AT_A_TIME];
+ private int decodedPosition = VALUES_AT_A_TIME - 1;
+ private byte[] encoded;
+ private int encodedPos;
+ private int nextOffset;
+
+ public ByteBitPackingValuesReader(int bound, Packer packer) {
+ this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
+ this.packer = packer.newBytePacker(bitWidth);
+ }
+
+ @Override
+ public int readInteger() {
+ ++ decodedPosition;
+ if (decodedPosition == decoded.length) {
+ if (encodedPos + bitWidth > encoded.length) {
+ packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
+ } else {
+ packer.unpack8Values(encoded, encodedPos, decoded, 0);
+ }
+ encodedPos += bitWidth;
+ decodedPosition = 0;
+ }
+ return decoded[decodedPosition];
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int offset)
+ throws IOException {
+ int effectiveBitLength = valueCount * bitWidth;
+ int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
+ if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitWidth + " bits." );
+ this.encoded = page;
+ this.encodedPos = offset;
+ this.decodedPosition = VALUES_AT_A_TIME - 1;
+ this.nextOffset = offset + length;
+ }
+
+ @Override
+ public int getNextOffset() {
+ return nextOffset;
+ }
+
+ @Override
+ public void skip() {
+ readInteger();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
new file mode 100644
index 0000000..d0240bb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+public class ByteBitPackingValuesWriter extends ValuesWriter {
+
+ private final Packer packer;
+ private final int bitWidth;
+ private ByteBasedBitPackingEncoder encoder;
+
+ public ByteBitPackingValuesWriter(int bound, Packer packer) {
+ this.packer = packer;
+ this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
+ this.encoder = new ByteBasedBitPackingEncoder(bitWidth, packer);
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ try {
+ this.encoder.writeInt(v);
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return BIT_PACKED;
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ try {
+ return encoder.toBytes();
+ } catch (IOException e) {
+ throw new ParquetEncodingException(e);
+ }
+ }
+
+ @Override
+ public void reset() {
+ encoder = new ByteBasedBitPackingEncoder(bitWidth, packer);
+ }
+
+ @Override
+ public long getBufferedSize() {
+ return encoder.getBufferSize();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return encoder.getAllocatedSize();
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return encoder.memUsageString(prefix);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
new file mode 100644
index 0000000..865eea2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import java.io.IOException;
+
+import org.apache.parquet.io.ParquetDecodingException;
+
+class BitReader {
+ private int currentByte = 0;
+ private int currentPosition = 8;
+ private byte[] buf;
+ private int currentBufferPosition = 0;
+ private static final int[] byteGetValueMask = new int[8];
+ private static final int[] readMask = new int[32];
+ private int endBufferPosistion;
+
+ static {
+ int currentMask = 1;
+ for (int i = 0; i < byteGetValueMask.length; i++) {
+ byteGetValueMask[i] = currentMask;
+ currentMask <<= 1;
+ }
+ currentMask = 0;
+ for (int i = 0; i < readMask.length; i++) {
+ readMask[i] = currentMask;
+ currentMask <<= 1;
+ currentMask += 1;
+ }
+ }
+
+ /**
+ * Prepare to deserialize bit-packed integers from the given array.
+ * The array is not copied, so must not be mutated during the course of
+ * reading.
+ */
+ public void prepare(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.endBufferPosistion = offset + length;
+ currentByte = 0;
+ currentPosition = 8;
+ currentBufferPosition = offset;
+ }
+
+ /**
+ * Extract the given bit index from the given value.
+ */
+ private static boolean extractBit(int val, int bit) {
+ return (val & byteGetValueMask[bit]) != 0;
+ }
+
+ /**
+ * Read an integer from the stream which is represented by a specified
+ * number of bits.
+ * @param bitsPerValue the number of bits used to represent the integer
+ */
+ public int readNBitInteger(int bitsPerValue) {
+ int bits = bitsPerValue + currentPosition;
+ int currentValue = currentByte >>> currentPosition;
+ int toShift = 8 - currentPosition;
+ while (bits >= 8) {
+ currentByte = getNextByte();
+ currentValue |= currentByte << toShift;
+ toShift += 8;
+ bits -= 8;
+ }
+ currentValue &= readMask[bitsPerValue];
+ currentPosition = (bitsPerValue + currentPosition) % 8;
+ return currentValue;
+ }
+
+ private int getNextByte() {
+ if (currentBufferPosition < endBufferPosistion) {
+ return buf[currentBufferPosition++] & 0xFF;
+ }
+ return 0;
+ }
+
+ public boolean readBit() throws IOException {
+ if (currentPosition == 8) {
+ currentByte = getNextByte();
+ currentPosition = 0;
+ }
+ return extractBit(currentByte, currentPosition++);
+ }
+
+ public int readByte() {
+ currentByte |= (getNextByte() << 8);
+ int value = (currentByte >>> currentPosition) & 0xFF;
+ currentByte >>>= 8;
+ return value;
+ }
+
+ public int readUnsignedVarint() throws IOException {
+ int value = 0;
+ int i = 0;
+ int b;
+ while (((b = readByte()) & 0x80) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ if (i > 35) {
+ throw new ParquetDecodingException("Variable length quantity is too long");
+ }
+ }
+ return value | (b << i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
new file mode 100644
index 0000000..1d1d9d1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+
+class BitWriter {
+ private static final Log LOG = Log.getLog(BitWriter.class);
+ private static final boolean DEBUG = false;//Log.DEBUG;
+
+ private CapacityByteArrayOutputStream baos;
+ private int currentByte = 0;
+ private int currentBytePosition = 0;
+ private static final int[] byteToTrueMask = new int[8];
+ private static final int[] byteToFalseMask = new int[8];
+ private boolean finished = false;
+ static {
+ int currentMask = 1;
+ for (int i = 0; i < byteToTrueMask.length; i++) {
+ byteToTrueMask[i] = currentMask;
+ byteToFalseMask[i] = ~currentMask;
+ currentMask <<= 1;
+ }
+ }
+
+ public BitWriter(int initialCapacity, int pageSize) {
+ this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+ }
+
+ public void writeBit(boolean bit) {
+ if (DEBUG) LOG.debug("writing: " + (bit ? "1" : "0"));
+ currentByte = setBytePosition(currentByte, currentBytePosition++, bit);
+ if (currentBytePosition == 8) {
+ baos.write(currentByte);
+ if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+ currentByte = 0;
+ currentBytePosition = 0;
+ }
+ }
+
+ public void writeByte(int val) {
+ if (DEBUG) LOG.debug("writing: " + toBinary(val) + " (" + val + ")");
+ currentByte |= ((val & 0xFF) << currentBytePosition);
+ baos.write(currentByte);
+ if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+ currentByte >>>= 8;
+ }
+
+ /**
+ * Write the given integer, serialized using the given number of bits.
+ * It is assumed that the integer can be correctly serialized within
+ * the provided bit size.
+ * @param val the value to serialize
+ * @param bitsToWrite the number of bits to use
+ */
+ public void writeNBitInteger(int val, int bitsToWrite) {
+ if (DEBUG) LOG.debug("writing: " + toBinary(val, bitsToWrite) + " (" + val + ")");
+ val <<= currentBytePosition;
+ int upperByte = currentBytePosition + bitsToWrite;
+ currentByte |= val;
+ while (upperByte >= 8) {
+ baos.write(currentByte); //this only writes the lowest byte
+ if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+ upperByte -= 8;
+ currentByte >>>= 8;
+ }
+ currentBytePosition = (currentBytePosition + bitsToWrite) % 8;
+ }
+
+ private String toBinary(int val, int alignTo) {
+ String result = Integer.toBinaryString(val);
+ while (result.length() < alignTo) {
+ result = "0" + result;
+ }
+ return result;
+ }
+
+ private String toBinary(int val) {
+ return toBinary(val, 8);
+ }
+
+ public BytesInput finish() {
+ if (!finished) {
+ if (currentBytePosition > 0) {
+ baos.write(currentByte);
+ if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte));
+ }
+ }
+ finished = true;
+ return BytesInput.from(baos);
+ }
+
+ public void reset() {
+ baos.reset();
+ currentByte = 0;
+ currentBytePosition = 0;
+ finished = false;
+ }
+
+ /**
+ * Set or clear the given bit position in the given byte.
+ * @param currentByte the byte to mutate
+ * @param bitOffset the bit to set or clear
+ * @param newBitValue whether to set or clear the bit
+ * @return the mutated byte
+ */
+ private static int setBytePosition(int currentByte, int bitOffset, boolean newBitValue) {
+ if (newBitValue) {
+ currentByte |= byteToTrueMask[bitOffset];
+ } else {
+ currentByte &= byteToFalseMask[bitOffset];
+ }
+ return currentByte;
+ }
+
+ //This assumes you will never give it a negative value
+ public void writeUnsignedVarint(int value) {
+ while ((value & 0xFFFFFF80) != 0L) {
+ writeByte((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ writeByte(value & 0x7F);
+ }
+
+ public int getMemSize() {
+ // baos = 8 bytes
+ // currentByte + currentBytePosition = 8 bytes
+ // the size of baos:
+ // count : 4 bytes (rounded to 8)
+ // buf : 12 bytes (8 ptr + 4 length) should technically be rounded to 8 depending on buffer size
+ return 32 + (int)baos.size();
+ }
+
+ public int getCapacity() {
+ return baos.getCapacity();
+ }
+
+ public String memUsageString(String prefix) {
+ return baos.memUsageString(prefix);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
new file mode 100644
index 0000000..3ace6e2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+
+public abstract class BoundedIntValuesFactory {
+ public static ValuesReader getBoundedReader(int bound) {
+ return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
+ }
+
+ public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
+ return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
new file mode 100644
index 0000000..dda4431
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.Log.DEBUG;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * @see BoundedIntValuesWriter
+ */
+class BoundedIntValuesReader extends ValuesReader {
+ private static final Log LOG = Log.getLog(BoundedIntValuesReader.class);
+
+ private int currentValueCt = 0;
+ private int currentValue = 0;
+ private final int bitsPerValue;
+ private BitReader bitReader = new BitReader();
+ private int nextOffset;
+
+ public BoundedIntValuesReader(int bound) {
+ if (bound == 0) {
+ throw new ParquetDecodingException("Value bound cannot be 0. Use DevNullColumnReader instead.");
+ }
+ bitsPerValue = BytesUtils.getWidthFromMaxInt(bound);
+ }
+
+ @Override
+ public int readInteger() {
+ try {
+ if (currentValueCt > 0) {
+ currentValueCt--;
+ return currentValue;
+ }
+ if (bitReader.readBit()) {
+ currentValue = bitReader.readNBitInteger(bitsPerValue);
+ currentValueCt = bitReader.readUnsignedVarint() - 1;
+ } else {
+ currentValue = bitReader.readNBitInteger(bitsPerValue);
+ }
+ return currentValue;
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read int", e);
+ }
+ }
+
+ // This forces it to deserialize into memory. If it wanted
+ // to, it could just read the bytes (though that number of
+ // bytes would have to be serialized). This is the flip-side
+ // to BoundedIntColumnWriter.writeData(BytesOutput)
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+ if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
+ int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
+ if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
+ currentValueCt = 0;
+ currentValue = 0;
+ bitReader.prepare(in, offset + 4, totalBytes);
+ if (DEBUG) LOG.debug("will read next from " + (offset + totalBytes + 4));
+ this.nextOffset = offset + totalBytes + 4;
+ }
+
+ @Override
+ public int getNextOffset() {
+ return this.nextOffset;
+ }
+
+ @Override
+ public void skip() {
+ readInteger();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
new file mode 100644
index 0000000..0acaaf7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.bytes.BytesInput.concat;
+import static org.apache.parquet.column.Encoding.RLE;
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bitpacking.BitPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * This is a special ColumnWriter for the case when you need to write
+ * integers in a known range. This is intended primarily for use with
+ * repetition and definition levels, since the maximum value that will
+ * be written is known a priori based on the schema. Assumption is that
+ * the values written are between 0 and the bound, inclusive.
+ *
+ * This differs from {@link BitPackingValuesWriter} in that this also performs
+ * run-length encoding of the data, so is useful when long runs of repeated
+ * values are expected.
+ */
+class BoundedIntValuesWriter extends ValuesWriter {
+ private static final Log LOG = Log.getLog(BoundedIntValuesWriter.class);
+
+ private int currentValue = -1;
+ private int currentValueCt = -1;
+ private boolean currentValueIsRepeated = false;
+ private boolean thereIsABufferedValue = false;
+ private int shouldRepeatThreshold = 0;
+ private int bitsPerValue;
+ private BitWriter bitWriter;
+ private boolean isFirst = true;
+
+ private static final int[] byteToTrueMask = new int[8];
+ static {
+ int currentMask = 1;
+ for (int i = 0; i < byteToTrueMask.length; i++) {
+ byteToTrueMask[i] = currentMask;
+ currentMask <<= 1;
+ }
+ }
+
+ public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
+ if (bound == 0) {
+ throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
+ }
+ this.bitWriter = new BitWriter(initialCapacity, pageSize);
+ bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
+ shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
+ if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
+ }
+
+ @Override
+ public long getBufferedSize() {
+ // currentValue + currentValueCt = 8 bytes
+ // shouldRepeatThreshold + bitsPerValue = 8 bytes
+ // bitWriter = 8 bytes
+ // currentValueIsRepeated + isFirst = 2 bytes (rounded to 8 b/c of word boundaries)
+ return 32 + (bitWriter == null ? 0 : bitWriter.getMemSize());
+ }
+
+ // This assumes that the full state must be serialized, since there is no close method
+ @Override
+ public BytesInput getBytes() {
+ serializeCurrentValue();
+ BytesInput buf = bitWriter.finish();
+ if (Log.DEBUG) LOG.debug("writing a buffer of size " + buf.size() + " + 4 bytes");
+ // We serialize the length so that on deserialization we can
+ // deserialize as we go, instead of having to load everything
+ // into memory
+ return concat(BytesInput.fromInt((int)buf.size()), buf);
+ }
+
+ @Override
+ public void reset() {
+ currentValue = -1;
+ currentValueCt = -1;
+ currentValueIsRepeated = false;
+ thereIsABufferedValue = false;
+ isFirst = true;
+ bitWriter.reset();
+ }
+
+ @Override
+ public void writeInteger(int val) {
+ if (currentValue == val) {
+ currentValueCt++;
+ if (!currentValueIsRepeated && currentValueCt >= shouldRepeatThreshold) {
+ currentValueIsRepeated = true;
+ }
+ } else {
+ if (!isFirst) {
+ serializeCurrentValue();
+ } else {
+ isFirst = false;
+ }
+
+ newCurrentValue(val);
+ }
+ }
+
+ private void serializeCurrentValue() {
+ if (thereIsABufferedValue) {
+ if (currentValueIsRepeated) {
+ bitWriter.writeBit(true);
+ bitWriter.writeNBitInteger(currentValue, bitsPerValue);
+ bitWriter.writeUnsignedVarint(currentValueCt);
+ } else {
+ for (int i = 0; i < currentValueCt; i++) {
+ bitWriter.writeBit(false);
+ bitWriter.writeNBitInteger(currentValue, bitsPerValue);
+ }
+ }
+ }
+ thereIsABufferedValue = false;
+ }
+
+ private void newCurrentValue(int val) {
+ currentValue = val;
+ currentValueCt = 1;
+ currentValueIsRepeated = false;
+ thereIsABufferedValue = true;
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return bitWriter.getCapacity();
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return RLE;
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return bitWriter.memUsageString(prefix);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
new file mode 100644
index 0000000..af92941
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * This is a special writer that doesn't write anything. The idea being that
+ * some columns will always be the same value, and this will capture that. An
+ * example is the set of repetition levels for a schema with no repeated fields.
+ */
+public class DevNullValuesWriter extends ValuesWriter {
+ @Override
+ public long getBufferedSize() {
+ return 0;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ }
+
+ @Override
+ public void writeByte(int value) {
+ }
+
+ @Override
+ public void writeBoolean(boolean v) {
+ }
+
+ @Override
+ public void writeBytes(Binary v) {
+ }
+
+ @Override
+ public void writeLong(long v) {
+ }
+
+ @Override
+ public void writeDouble(double v) {
+ }
+
+ @Override
+ public void writeFloat(float v) {
+ }
+
+ @Override
+ public BytesInput getBytes() {
+ return BytesInput.empty();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ return 0;
+ }
+
+ @Override
+ public Encoding getEncoding() {
+ return BIT_PACKED;
+ }
+
+ @Override
+ public String memUsageString(String prefix) {
+ return prefix + "0";
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
new file mode 100644
index 0000000..9201596
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.boundedint;
+
+import java.io.IOException;
+
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * ColumnReader which does not read any actual data, but rather simply produces
+ * an endless stream of constant values.
+ * Mainly used to read definition levels when the only possible value is 0
+ */
+public class ZeroIntegerValuesReader extends ValuesReader {
+
+ private int nextOffset;
+
+ public int readInteger() {
+ return 0;
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+ this.nextOffset = offset;
+ }
+
+ @Override
+ public int getNextOffset() {
+ return nextOffset;
+ }
+
+ @Override
+ public void skip() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
new file mode 100644
index 0000000..565d6ec
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Config for delta binary packing
+ *
+ * @author Tianshuo Deng
+ */
+class DeltaBinaryPackingConfig {
+ final int blockSizeInValues;
+ final int miniBlockNumInABlock;
+ final int miniBlockSizeInValues;
+
+ public DeltaBinaryPackingConfig(int blockSizeInValues, int miniBlockNumInABlock) {
+ this.blockSizeInValues = blockSizeInValues;
+ this.miniBlockNumInABlock = miniBlockNumInABlock;
+ double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+ Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+ this.miniBlockSizeInValues = (int) miniSize;
+ }
+
+ public static DeltaBinaryPackingConfig readConfig(InputStream in) throws IOException {
+ return new DeltaBinaryPackingConfig(BytesUtils.readUnsignedVarInt(in),
+ BytesUtils.readUnsignedVarInt(in));
+ }
+
+ public BytesInput toBytesInput() {
+ return BytesInput.concat(
+ BytesInput.fromUnsignedVarInt(blockSizeInValues),
+ BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
new file mode 100644
index 0000000..c1678ae
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * Read values written by {@link DeltaBinaryPackingValuesWriter}
+ *
+ * @author Tianshuo Deng
+ */
+public class DeltaBinaryPackingValuesReader extends ValuesReader {
+ private int totalValueCount;
+ /**
+ * values read by the caller
+ */
+ private int valuesRead;
+ private int minDeltaInCurrentBlock;
+ private byte[] page;
+ /**
+ * stores the decoded values including the first value which is written to the header
+ */
+ private int[] valuesBuffer;
+ /**
+ * values loaded to the buffer, it could be bigger than the totalValueCount
+ * when data is not aligned to mini block, which means padding 0s are in the buffer
+ */
+ private int valuesBuffered;
+ private ByteArrayInputStream in;
+ private int nextOffset;
+ private DeltaBinaryPackingConfig config;
+ private int[] bitWidths;
+
+ /**
+ * eagerly load all the data into memory
+ *
+ * @param valueCount count of values in this page
+ * @param page the array to read from containing the page data (repetition levels, definition levels, data)
+ * @param offset where to start reading from in the page
+ * @throws IOException
+ */
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+ in = new ByteArrayInputStream(page, offset, page.length - offset);
+ this.config = DeltaBinaryPackingConfig.readConfig(in);
+ this.page = page;
+ this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
+ allocateValuesBuffer();
+ bitWidths = new int[config.miniBlockNumInABlock];
+
+ //read first value from header
+ valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarInt(in);
+
+ while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
+ loadNewBlockToBuffer();
+ }
+ this.nextOffset = page.length - in.available();
+ }
+
+ @Override
+ public int getNextOffset() {
+ return nextOffset;
+ }
+
+ /**
+ * the value buffer is allocated so that the size of it is multiple of mini block
+ * because when writing, data is flushed on a mini block basis
+ */
+ private void allocateValuesBuffer() {
+ int totalMiniBlockCount = (int) Math.ceil((double) totalValueCount / config.miniBlockSizeInValues);
+ //+ 1 because first value written to header is also stored in values buffer
+ valuesBuffer = new int[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
+ }
+
+ @Override
+ public void skip() {
+ checkRead();
+ valuesRead++;
+ }
+
+ @Override
+ public int readInteger() {
+ checkRead();
+ return valuesBuffer[valuesRead++];
+ }
+
+ private void checkRead() {
+ if (valuesRead >= totalValueCount) {
+ throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
+ }
+ }
+
+ private void loadNewBlockToBuffer() {
+ try {
+ minDeltaInCurrentBlock = BytesUtils.readZigZagVarInt(in);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("can not read min delta in current block", e);
+ }
+
+ readBitWidthsForMiniBlocks();
+
+ // mini block is atomic for reading, we read a mini block when there are more values left
+ int i;
+ for (i = 0; i < config.miniBlockNumInABlock && valuesBuffered < totalValueCount; i++) {
+ BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidths[i]);
+ unpackMiniBlock(packer);
+ }
+
+ //calculate values from deltas unpacked for current block
+ int valueUnpacked=i*config.miniBlockSizeInValues;
+ for (int j = valuesBuffered-valueUnpacked; j < valuesBuffered; j++) {
+ int index = j;
+ valuesBuffer[index] += minDeltaInCurrentBlock + valuesBuffer[index - 1];
+ }
+ }
+
+ /**
+ * mini block has a size of 8*n, unpack 8 value each time
+ *
+ * @param packer the packer created from bitwidth of current mini block
+ */
+ private void unpackMiniBlock(BytePacker packer) {
+ for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
+ unpack8Values(packer);
+ }
+ }
+
+ private void unpack8Values(BytePacker packer) {
+ //calculate the pos because the packer api uses array not stream
+ int pos = page.length - in.available();
+ packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
+ this.valuesBuffered += 8;
+ //sync the pos in stream
+ in.skip(packer.getBitWidth());
+ }
+
+ private void readBitWidthsForMiniBlocks() {
+ for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+ try {
+ bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(in);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Can not decode bitwidth in block header", e);
+ }
+ }
+ }
+}