You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:40 UTC
[43/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
new file mode 100644
index 0000000..2fa63a8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.String.format;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+
+/**
+ * ColumnReader implementation
+ *
+ * @author Julien Le Dem
+ *
+ */
+class ColumnReaderImpl implements ColumnReader {
+ private static final Log LOG = Log.getLog(ColumnReaderImpl.class);
+
+ /**
+ * binds the lower level page decoder to the record converter materializing the records
+ *
+ * @author Julien Le Dem
+ *
+ */
+ private static abstract class Binding {
+
+ /**
+ * read one value from the underlying page
+ */
+ abstract void read();
+
+ /**
+ * skip one value from the underlying page
+ */
+ abstract void skip();
+
+ /**
+ * write current value to converter
+ */
+ abstract void writeValue();
+
+ /**
+ * @return current value
+ */
+ public int getDictionaryId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public int getInteger() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public boolean getBoolean() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public long getLong() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public Binary getBinary() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public float getFloat() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return current value
+ */
+ public double getDouble() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private final ColumnDescriptor path;
+ private final long totalValueCount;
+ private final PageReader pageReader;
+ private final Dictionary dictionary;
+
+ private IntIterator repetitionLevelColumn;
+ private IntIterator definitionLevelColumn;
+ protected ValuesReader dataColumn;
+
+ private int repetitionLevel;
+ private int definitionLevel;
+ private int dictionaryId;
+
+ private long endOfPageValueCount;
+ private int readValues;
+ private int pageValueCount;
+
+ private final PrimitiveConverter converter;
+ private Binding binding;
+
+ // this is needed because we will attempt to read the value twice when filtering
+ // TODO: rework that
+ private boolean valueRead;
+
+ private void bindToDictionary(final Dictionary dictionary) {
+ binding =
+ new Binding() {
+ void read() {
+ dictionaryId = dataColumn.readValueDictionaryId();
+ }
+ public void skip() {
+ dataColumn.skip();
+ }
+ public int getDictionaryId() {
+ return dictionaryId;
+ }
+ void writeValue() {
+ converter.addValueFromDictionary(dictionaryId);
+ }
+ public int getInteger() {
+ return dictionary.decodeToInt(dictionaryId);
+ }
+ public boolean getBoolean() {
+ return dictionary.decodeToBoolean(dictionaryId);
+ }
+ public long getLong() {
+ return dictionary.decodeToLong(dictionaryId);
+ }
+ public Binary getBinary() {
+ return dictionary.decodeToBinary(dictionaryId);
+ }
+ public float getFloat() {
+ return dictionary.decodeToFloat(dictionaryId);
+ }
+ public double getDouble() {
+ return dictionary.decodeToDouble(dictionaryId);
+ }
+ };
+ }
+
+ private void bind(PrimitiveTypeName type) {
+ binding = type.convert(new PrimitiveTypeNameConverter<Binding, RuntimeException>() {
+ @Override
+ public Binding convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ float current;
+ void read() {
+ current = dataColumn.readFloat();
+ }
+ public void skip() {
+ current = 0;
+ dataColumn.skip();
+ }
+ public float getFloat() {
+ return current;
+ }
+ void writeValue() {
+ converter.addFloat(current);
+ }
+ };
+ }
+ @Override
+ public Binding convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ double current;
+ void read() {
+ current = dataColumn.readDouble();
+ }
+ public void skip() {
+ current = 0;
+ dataColumn.skip();
+ }
+ public double getDouble() {
+ return current;
+ }
+ void writeValue() {
+ converter.addDouble(current);
+ }
+ };
+ }
+ @Override
+ public Binding convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ int current;
+ void read() {
+ current = dataColumn.readInteger();
+ }
+ public void skip() {
+ current = 0;
+ dataColumn.skip();
+ }
+ @Override
+ public int getInteger() {
+ return current;
+ }
+ void writeValue() {
+ converter.addInt(current);
+ }
+ };
+ }
+ @Override
+ public Binding convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ long current;
+ void read() {
+ current = dataColumn.readLong();
+ }
+ public void skip() {
+ current = 0;
+ dataColumn.skip();
+ }
+ @Override
+ public long getLong() {
+ return current;
+ }
+ void writeValue() {
+ converter.addLong(current);
+ }
+ };
+ }
+ @Override
+ public Binding convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return this.convertBINARY(primitiveTypeName);
+ }
+ @Override
+ public Binding convertFIXED_LEN_BYTE_ARRAY(
+ PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return this.convertBINARY(primitiveTypeName);
+ }
+ @Override
+ public Binding convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ boolean current;
+ void read() {
+ current = dataColumn.readBoolean();
+ }
+ public void skip() {
+ current = false;
+ dataColumn.skip();
+ }
+ @Override
+ public boolean getBoolean() {
+ return current;
+ }
+ void writeValue() {
+ converter.addBoolean(current);
+ }
+ };
+ }
+ @Override
+ public Binding convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
+ return new Binding() {
+ Binary current;
+ void read() {
+ current = dataColumn.readBytes();
+ }
+ public void skip() {
+ current = null;
+ dataColumn.skip();
+ }
+ @Override
+ public Binary getBinary() {
+ return current;
+ }
+ void writeValue() {
+ converter.addBinary(current);
+ }
+ };
+ }
+ });
+ }
+
+ /**
+ * creates a reader for triplets
+ * @param path the descriptor for the corresponding column
+ * @param pageReader the underlying store to read from
+ */
+ public ColumnReaderImpl(ColumnDescriptor path, PageReader pageReader, PrimitiveConverter converter) {
+ this.path = checkNotNull(path, "path");
+ this.pageReader = checkNotNull(pageReader, "pageReader");
+ this.converter = checkNotNull(converter, "converter");
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
+ if (converter.hasDictionarySupport()) {
+ converter.setDictionary(dictionary);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
+ }
+ } else {
+ this.dictionary = null;
+ }
+ this.totalValueCount = pageReader.getTotalValueCount();
+ if (totalValueCount == 0) {
+ throw new ParquetDecodingException("totalValueCount == 0");
+ }
+ consume();
+ }
+
+ private boolean isFullyConsumed() {
+ return readValues >= totalValueCount;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#writeCurrentValueToConverter()
+ */
+ @Override
+ public void writeCurrentValueToConverter() {
+ readValue();
+ this.binding.writeValue();
+ }
+
+ @Override
+ public int getCurrentValueDictionaryID() {
+ readValue();
+ return binding.getDictionaryId();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getInteger()
+ */
+ @Override
+ public int getInteger() {
+ readValue();
+ return this.binding.getInteger();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getBoolean()
+ */
+ @Override
+ public boolean getBoolean() {
+ readValue();
+ return this.binding.getBoolean();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getLong()
+ */
+ @Override
+ public long getLong() {
+ readValue();
+ return this.binding.getLong();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getBinary()
+ */
+ @Override
+ public Binary getBinary() {
+ readValue();
+ return this.binding.getBinary();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getFloat()
+ */
+ @Override
+ public float getFloat() {
+ readValue();
+ return this.binding.getFloat();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getDouble()
+ */
+ @Override
+ public double getDouble() {
+ readValue();
+ return this.binding.getDouble();
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getCurrentRepetitionLevel()
+ */
+ @Override
+ public int getCurrentRepetitionLevel() {
+ return repetitionLevel;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getDescriptor()
+ */
+ @Override
+ public ColumnDescriptor getDescriptor() {
+ return path;
+ }
+
+ /**
+ * Reads the value into the binding.
+ */
+ public void readValue() {
+ try {
+ if (!valueRead) {
+ binding.read();
+ valueRead = true;
+ }
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException(
+ format(
+ "Can't read value in column %s at value %d out of %d, %d out of %d in currentPage. repetition level: %d, definition level: %d",
+ path, readValues, totalValueCount, readValues - (endOfPageValueCount - pageValueCount), pageValueCount, repetitionLevel, definitionLevel),
+ e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#skip()
+ */
+ @Override
+ public void skip() {
+ if (!valueRead) {
+ binding.skip();
+ valueRead = true;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getCurrentDefinitionLevel()
+ */
+ @Override
+ public int getCurrentDefinitionLevel() {
+ return definitionLevel;
+ }
+
+ // TODO: change the logic around read() to not tie together reading from the 3 columns
+ private void readRepetitionAndDefinitionLevels() {
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ ++readValues;
+ }
+
+ private void checkRead() {
+ if (isPageFullyConsumed()) {
+ if (isFullyConsumed()) {
+ if (DEBUG) LOG.debug("end reached");
+ repetitionLevel = 0; // the next repetition level
+ return;
+ }
+ readPage();
+ }
+ readRepetitionAndDefinitionLevels();
+ }
+
+ private void readPage() {
+ if (DEBUG) LOG.debug("loading page");
+ DataPage page = pageReader.readPage();
+ page.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = readValues + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ if (dictionary == null) {
+ throw new ParquetDecodingException(
+ "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
+ }
+ this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
+ } else {
+ this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
+ }
+ if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
+ bindToDictionary(dictionary);
+ } else {
+ bind(path.getType());
+ }
+ try {
+ dataColumn.initFromPage(pageValueCount, bytes, offset);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page in col " + path, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ byte[] bytes = page.getBytes().toByteArray();
+ if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+ if (DEBUG) LOG.debug("reading repetition levels at 0");
+ rlReader.initFromPage(pageValueCount, bytes, 0);
+ int next = rlReader.getNextOffset();
+ if (DEBUG) LOG.debug("reading definition levels at " + next);
+ dlReader.initFromPage(pageValueCount, bytes, next);
+ next = dlReader.getNextOffset();
+ if (DEBUG) LOG.debug("reading data at " + next);
+ initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
+ this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+ try {
+ if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read levels in page for col " + path, e);
+ }
+ }
+
+ private boolean isPageFullyConsumed() {
+ return readValues >= endOfPageValueCount;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#consume()
+ */
+ @Override
+ public void consume() {
+ checkRead();
+ valueRead = false;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.apache.parquet.column.ColumnReader#getTotalValueCount()
+ */
+ @Override
+ public long getTotalValueCount() {
+ return totalValueCount;
+ }
+
+ static abstract class IntIterator {
+ abstract int nextInt();
+ }
+
+ static class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ super();
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ static class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ private static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
new file mode 100644
index 0000000..a72b6f7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+
+public class ColumnWriteStoreV1 implements ColumnWriteStore {
+
+ private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
+ private final PageWriteStore pageWriteStore;
+ private final int pageSizeThreshold;
+ private final int dictionaryPageSizeThreshold;
+ private final boolean enableDictionary;
+ private final WriterVersion writerVersion;
+
+ public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
+ super();
+ this.pageWriteStore = pageWriteStore;
+ this.pageSizeThreshold = pageSizeThreshold;
+ this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
+ this.enableDictionary = enableDictionary;
+ this.writerVersion = writerVersion;
+ }
+
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ ColumnWriterV1 column = columns.get(path);
+ if (column == null) {
+ column = newMemColumn(path);
+ columns.put(path, column);
+ }
+ return column;
+ }
+
+ public Set<ColumnDescriptor> getColumnDescriptors() {
+ return columns.keySet();
+ }
+
+ private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
+ sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+ sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long total = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ total += memColumn.allocatedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long total = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ total += memColumn.getBufferedSizeInMemory();
+ }
+ return total;
+ }
+
+ @Override
+ public String memUsageString() {
+ StringBuilder b = new StringBuilder("Store {\n");
+ Collection<ColumnWriterV1> values = columns.values();
+ for (ColumnWriterV1 memColumn : values) {
+ b.append(memColumn.memUsageString(" "));
+ }
+ b.append("}\n");
+ return b.toString();
+ }
+
+ public long maxColMemSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long max = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ max = Math.max(max, memColumn.getBufferedSizeInMemory());
+ }
+ return max;
+ }
+
+ @Override
+ public void flush() {
+ Collection<ColumnWriterV1> values = columns.values();
+ for (ColumnWriterV1 memColumn : values) {
+ memColumn.flush();
+ }
+ }
+
+ @Override
+ public void endRecord() {
+ // V1 does not take record boundaries into account
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
new file mode 100644
index 0000000..fc17a22
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.schema.MessageType;
+
+public class ColumnWriteStoreV2 implements ColumnWriteStore {
+
+ // will wait for at least that many records before checking again
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+ private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+ // will flush even if size bellow the threshold by this much to facilitate page alignment
+ private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
+
+ private final Map<ColumnDescriptor, ColumnWriterV2> columns;
+ private final Collection<ColumnWriterV2> writers;
+ private long rowCount;
+ private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ private final long thresholdTolerance;
+
+ private int pageSizeThreshold;
+
+ public ColumnWriteStoreV2(
+ MessageType schema,
+ PageWriteStore pageWriteStore,
+ int pageSizeThreshold,
+ ParquetProperties parquetProps) {
+ super();
+ this.pageSizeThreshold = pageSizeThreshold;
+ this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
+ Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
+ }
+ this.columns = unmodifiableMap(mcolumns);
+ this.writers = this.columns.values();
+ }
+
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ return columns.get(path);
+ }
+
+ public Set<ColumnDescriptor> getColumnDescriptors() {
+ return columns.keySet();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
+ sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+ sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ long total = 0;
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ total += memColumn.allocatedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ long total = 0;
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ total += memColumn.getTotalBufferedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public void flush() {
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ long rows = rowCount - memColumn.getRowsWrittenSoFar();
+ if (rows > 0) {
+ memColumn.writePage(rowCount);
+ }
+ memColumn.finalizeColumnChunk();
+ }
+ }
+
+ public String memUsageString() {
+ StringBuilder b = new StringBuilder("Store {\n");
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ b.append(memColumn.memUsageString(" "));
+ }
+ b.append("}\n");
+ return b.toString();
+ }
+
+ @Override
+ public void endRecord() {
+ ++ rowCount;
+ if (rowCount >= rowCountForNextSizeCheck) {
+ sizeCheck();
+ }
+ }
+
+ private void sizeCheck() {
+ long minRecordToWait = Long.MAX_VALUE;
+ for (ColumnWriterV2 writer : writers) {
+ long usedMem = writer.getCurrentPageBufferedSize();
+ long rows = rowCount - writer.getRowsWrittenSoFar();
+ long remainingMem = pageSizeThreshold - usedMem;
+ if (remainingMem <= thresholdTolerance) {
+ writer.writePage(rowCount);
+ remainingMem = pageSizeThreshold;
+ }
+ long rowsToFillPage =
+ usedMem == 0 ?
+ MAXIMUM_RECORD_COUNT_FOR_CHECK
+ : (long)((float)rows) / usedMem * remainingMem;
+ if (rowsToFillPage < minRecordToWait) {
+ minRecordToWait = rowsToFillPage;
+ }
+ }
+ if (minRecordToWait == Long.MAX_VALUE) {
+ minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ }
+ // will check again halfway
+ rowCountForNextSizeCheck = rowCount +
+ min(
+ max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
+ MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
new file mode 100644
index 0000000..f4079c7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static org.apache.parquet.bytes.BytesInput.concat;
+
+import java.io.IOException;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+import static java.lang.Math.max;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV1 implements ColumnWriter {
+ private static final Log LOG = Log.getLog(ColumnWriterV1.class);
+ private static final boolean DEBUG = Log.DEBUG;
+ private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
+ private static final int MIN_SLAB_SIZE = 64;
+
+ private final ColumnDescriptor path;
+ private final PageWriter pageWriter;
+ private final long pageSizeThreshold;
+ private ValuesWriter repetitionLevelColumn;
+ private ValuesWriter definitionLevelColumn;
+ private ValuesWriter dataColumn;
+ private int valueCount;
+ private int valueCountForNextSizeCheck;
+
+ private Statistics statistics;
+
+ public ColumnWriterV1(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ int pageSizeThreshold,
+ int dictionaryPageSizeThreshold,
+ boolean enableDictionary,
+ WriterVersion writerVersion) {
+ this.path = path;
+ this.pageWriter = pageWriter;
+ this.pageSizeThreshold = pageSizeThreshold;
+ // initial check of memory usage. So that we have enough data to make an initial prediction
+ this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
+ resetStatistics();
+
+ ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
+
+ this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+ this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+
+ int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
+ this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
+ }
+
+ private void log(Object value, int r, int d) {
+ LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+ }
+
+ private void resetStatistics() {
+ this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+ }
+
+ /**
+ * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
+ *
+ * We measure the memory used when we reach the mid point toward our estimated count.
+ * We then update the estimate and flush the page if we reached the threshold.
+ *
+ * That way we check the memory size log2(n) times.
+ *
+ */
+ private void accountForValueWritten() {
+ ++ valueCount;
+ if (valueCount > valueCountForNextSizeCheck) {
+ // not checking the memory used for every value
+ long memSize = repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize();
+ if (memSize > pageSizeThreshold) {
+ // we will write the current page and check again the size at the predicted middle of next page
+ valueCountForNextSizeCheck = valueCount / 2;
+ writePage();
+ } else {
+ // not reached the threshold, will check again midway
+ valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
+ }
+ }
+ }
+
+ private void updateStatisticsNumNulls() {
+ statistics.incrementNumNulls();
+ }
+
+ private void updateStatistics(int value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(long value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(float value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(double value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(Binary value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(boolean value) {
+ statistics.updateStats(value);
+ }
+
+ private void writePage() {
+ if (DEBUG) LOG.debug("write page");
+ try {
+ pageWriter.writePage(
+ concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
+ valueCount,
+ statistics,
+ repetitionLevelColumn.getEncoding(),
+ definitionLevelColumn.getEncoding(),
+ dataColumn.getEncoding());
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page for " + path, e);
+ }
+ repetitionLevelColumn.reset();
+ definitionLevelColumn.reset();
+ dataColumn.reset();
+ valueCount = 0;
+ resetStatistics();
+ }
+
+ @Override
+ public void writeNull(int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(null, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ updateStatisticsNumNulls();
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(double value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeDouble(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeFloat(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(Binary value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeBytes(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(boolean value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeBoolean(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(int value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeInteger(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(long value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeLong(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ public void flush() {
+ if (valueCount > 0) {
+ writePage();
+ }
+ final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+ if (dictionaryPage != null) {
+ if (DEBUG) LOG.debug("write dictionary");
+ try {
+ pageWriter.writeDictionaryPage(dictionaryPage);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+ }
+ dataColumn.resetDictionary();
+ }
+ }
+
+ public long getBufferedSizeInMemory() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize()
+ + pageWriter.getMemSize();
+ }
+
+ public long allocatedSize() {
+ return repetitionLevelColumn.getAllocatedSize()
+ + definitionLevelColumn.getAllocatedSize()
+ + dataColumn.getAllocatedSize()
+ + pageWriter.allocatedSize();
+ }
+
+ public String memUsageString(String indent) {
+ StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+ b.append(repetitionLevelColumn.memUsageString(indent + " r:")).append("\n");
+ b.append(definitionLevelColumn.memUsageString(indent + " d:")).append("\n");
+ b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
+ b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
+ b.append(indent).append(String.format(" total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
+ b.append(indent).append("}\n");
+ return b.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
new file mode 100644
index 0000000..5e936a2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static java.lang.Math.max;
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
+import java.io.IOException;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV2 implements ColumnWriter {
+ private static final Log LOG = Log.getLog(ColumnWriterV2.class);
+ private static final boolean DEBUG = Log.DEBUG;
+ private static final int MIN_SLAB_SIZE = 64;
+
+ private final ColumnDescriptor path;
+ private final PageWriter pageWriter;
+ private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
+ private RunLengthBitPackingHybridEncoder definitionLevelColumn;
+ private ValuesWriter dataColumn;
+ private int valueCount;
+
+ private Statistics<?> statistics;
+ private long rowsWrittenSoFar = 0;
+
+ public ColumnWriterV2(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ ParquetProperties parquetProps,
+ int pageSize) {
+ this.path = path;
+ this.pageWriter = pageWriter;
+ resetStatistics();
+
+ this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
+ this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
+
+ int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
+ this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
+ }
+
+ private void log(Object value, int r, int d) {
+ LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+ }
+
+ private void resetStatistics() {
+ this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+ }
+
+ private void definitionLevel(int definitionLevel) {
+ try {
+ definitionLevelColumn.writeInt(definitionLevel);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+ }
+ }
+
+ private void repetitionLevel(int repetitionLevel) {
+ try {
+ repetitionLevelColumn.writeInt(repetitionLevel);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
+ }
+ }
+
+ /**
+ * writes the current null value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void writeNull(int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(null, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ statistics.incrementNumNulls();
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(double value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeDouble(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeFloat(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(Binary value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBytes(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(boolean value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBoolean(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(int value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeInteger(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(long value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeLong(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
+ * Is called right after writePage
+ */
+ public void finalizeColumnChunk() {
+ final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+ if (dictionaryPage != null) {
+ if (DEBUG) LOG.debug("write dictionary");
+ try {
+ pageWriter.writeDictionaryPage(dictionaryPage);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+ }
+ dataColumn.resetDictionary();
+ }
+ }
+
+ /**
+ * used to decide when to write a page
+ * @return the number of bytes of memory used to buffer the current data
+ */
+ public long getCurrentPageBufferedSize() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize();
+ }
+
+ /**
+ * used to decide when to write a page or row group
+ * @return the number of bytes of memory used to buffer the current data and the previously written pages
+ */
+ public long getTotalBufferedSize() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize()
+ + pageWriter.getMemSize();
+ }
+
+ /**
+ * @return actual memory used
+ */
+ public long allocatedSize() {
+ return repetitionLevelColumn.getAllocatedSize()
+ + definitionLevelColumn.getAllocatedSize()
+ + dataColumn.getAllocatedSize()
+ + pageWriter.allocatedSize();
+ }
+
+ /**
+ * @param prefix a prefix to format lines
+ * @return a formatted string showing how memory is used
+ */
+ public String memUsageString(String indent) {
+ StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+ b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+ b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+ b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
+ b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
+ b.append(indent).append(String.format(" total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
+ b.append(indent).append("}\n");
+ return b.toString();
+ }
+
+ public long getRowsWrittenSoFar() {
+ return this.rowsWrittenSoFar;
+ }
+
+ /**
+ * writes the current data to a new page in the page store
+ * @param rowCount how many rows have been written so far
+ */
+ public void writePage(long rowCount) {
+ int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
+ this.rowsWrittenSoFar = rowCount;
+ if (DEBUG) LOG.debug("write page");
+ try {
+ // TODO: rework this API. Those must be called *in that order*
+ BytesInput bytes = dataColumn.getBytes();
+ Encoding encoding = dataColumn.getEncoding();
+ pageWriter.writePageV2(
+ pageRowCount,
+ Ints.checkedCast(statistics.getNumNulls()),
+ valueCount,
+ path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
+ path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
+ encoding,
+ bytes,
+ statistics
+ );
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page for " + path, e);
+ }
+ repetitionLevelColumn.reset();
+ definitionLevelColumn.reset();
+ dataColumn.reset();
+ valueCount = 0;
+ resetStatistics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
new file mode 100644
index 0000000..9f11490
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * one data page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class DataPage extends Page {
+
+ private final int valueCount;
+
+ DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+ super(compressedSize, uncompressedSize);
+ this.valueCount = valueCount;
+ }
+
+ /**
+ * @return the number of values in that page
+ */
+ public int getValueCount() {
+ return valueCount;
+ }
+
+ public abstract <T> T accept(Visitor<T> visitor);
+
+ public static interface Visitor<T> {
+
+ T visit(DataPageV1 dataPageV1);
+
+ T visit(DataPageV2 dataPageV2);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
new file mode 100644
index 0000000..2206517
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV1.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+public class DataPageV1 extends DataPage {
+
+ private final BytesInput bytes;
+ private final Statistics<?> statistics;
+ private final Encoding rlEncoding;
+ private final Encoding dlEncoding;
+ private final Encoding valuesEncoding;
+
+ /**
+ * @param bytes the bytes for this page
+ * @param valueCount count of values in this page
+ * @param uncompressedSize the uncompressed size of the page
+ * @param statistics of the page's values (max, min, num_null)
+ * @param rlEncoding the repetition level encoding for this page
+ * @param dlEncoding the definition level encoding for this page
+ * @param valuesEncoding the values encoding for this page
+ * @param dlEncoding
+ */
+ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+ super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
+ this.bytes = bytes;
+ this.statistics = stats;
+ this.rlEncoding = rlEncoding;
+ this.dlEncoding = dlEncoding;
+ this.valuesEncoding = valuesEncoding;
+ }
+
+ /**
+ * @return the bytes for the page
+ */
+ public BytesInput getBytes() {
+ return bytes;
+ }
+
+ /**
+ *
+ * @return the statistics for this page (max, min, num_nulls)
+ */
+ public Statistics<?> getStatistics() {
+ return statistics;
+ }
+
+ /**
+ * @return the definition level encoding for this page
+ */
+ public Encoding getDlEncoding() {
+ return dlEncoding;
+ }
+
+ /**
+ * @return the repetition level encoding for this page
+ */
+ public Encoding getRlEncoding() {
+ return rlEncoding;
+ }
+
+ /**
+ * @return the values encoding for this page
+ */
+ public Encoding getValueEncoding() {
+ return valuesEncoding;
+ }
+
+ @Override
+ public String toString() {
+ return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
+ }
+
+ @Override
+ public <T> T accept(Visitor<T> visitor) {
+ return visitor.visit(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
new file mode 100644
index 0000000..13b64c3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+public class DataPageV2 extends DataPage {
+
+ /**
+ * @param rowCount
+ * @param nullCount
+ * @param valueCount
+ * @param repetitionLevels RLE encoded repetition levels
+ * @param definitionLevels RLE encoded definition levels
+ * @param dataEncoding encoding for the data
+ * @param data data encoded with dataEncoding
+ * @param statistics optional statistics for this page
+ * @return an uncompressed page
+ */
+ public static DataPageV2 uncompressed(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ Statistics<?> statistics) {
+ return new DataPageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+ statistics,
+ false);
+ }
+
+ /**
+ * @param rowCount
+ * @param nullCount
+ * @param valueCount
+ * @param repetitionLevels RLE encoded repetition levels
+ * @param definitionLevels RLE encoded definition levels
+ * @param dataEncoding encoding for the data
+ * @param data data encoded with dataEncoding and compressed
+ * @param uncompressedSize total size uncompressed (rl + dl + data)
+ * @param statistics optional statistics for this page
+ * @return a compressed page
+ */
+ public static DataPageV2 compressed(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ int uncompressedSize,
+ Statistics<?> statistics) {
+ return new DataPageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ uncompressedSize,
+ statistics,
+ true);
+ }
+
+ private final int rowCount;
+ private final int nullCount;
+ private final BytesInput repetitionLevels;
+ private final BytesInput definitionLevels;
+ private final Encoding dataEncoding;
+ private final BytesInput data;
+ private final Statistics<?> statistics;
+ private final boolean isCompressed;
+
+ public DataPageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ int uncompressedSize,
+ Statistics<?> statistics,
+ boolean isCompressed) {
+ super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
+ this.rowCount = rowCount;
+ this.nullCount = nullCount;
+ this.repetitionLevels = repetitionLevels;
+ this.definitionLevels = definitionLevels;
+ this.dataEncoding = dataEncoding;
+ this.data = data;
+ this.statistics = statistics;
+ this.isCompressed = isCompressed;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public int getNullCount() {
+ return nullCount;
+ }
+
+ public BytesInput getRepetitionLevels() {
+ return repetitionLevels;
+ }
+
+ public BytesInput getDefinitionLevels() {
+ return definitionLevels;
+ }
+
+ public Encoding getDataEncoding() {
+ return dataEncoding;
+ }
+
+ public BytesInput getData() {
+ return data;
+ }
+
+ public Statistics<?> getStatistics() {
+ return statistics;
+ }
+
+ public boolean isCompressed() {
+ return isCompressed;
+ }
+
+ @Override
+ public <T> T accept(Visitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String toString() {
+ return "Page V2 ["
+ + "dl size=" + definitionLevels.size() + ", "
+ + "rl size=" + repetitionLevels.size() + ", "
+ + "data size=" + data.size() + ", "
+ + "data enc=" + dataEncoding + ", "
+ + "valueCount=" + getValueCount() + ", "
+ + "rowCount=" + getRowCount() + ", "
+ + "is compressed=" + isCompressed + ", "
+ + "uncompressedSize=" + getUncompressedSize() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
new file mode 100644
index 0000000..306d81b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+
+/**
+ * Data for a dictionary page
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class DictionaryPage extends Page {
+
+ private final BytesInput bytes;
+ private final int dictionarySize;
+ private final Encoding encoding;
+
+ /**
+ * creates an uncompressed page
+ * @param bytes the content of the page
+ * @param dictionarySize the value count in the dictionary
+ * @param encoding the encoding used
+ */
+ public DictionaryPage(BytesInput bytes, int dictionarySize, Encoding encoding) {
+ this(bytes, (int)bytes.size(), dictionarySize, encoding); // TODO: fix sizes long or int
+ }
+
+ /**
+ * creates a dictionary page
+ * @param bytes the (possibly compressed) content of the page
+ * @param uncompressedSize the size uncompressed
+ * @param dictionarySize the value count in the dictionary
+ * @param encoding the encoding used
+ */
+ public DictionaryPage(BytesInput bytes, int uncompressedSize, int dictionarySize, Encoding encoding) {
+ super(Ints.checkedCast(bytes.size()), uncompressedSize);
+ this.bytes = checkNotNull(bytes, "bytes");
+ this.dictionarySize = dictionarySize;
+ this.encoding = checkNotNull(encoding, "encoding");
+ }
+
+ public BytesInput getBytes() {
+ return bytes;
+ }
+
+ public int getDictionarySize() {
+ return dictionarySize;
+ }
+
+ public Encoding getEncoding() {
+ return encoding;
+ }
+
+ public DictionaryPage copy() throws IOException {
+ return new DictionaryPage(BytesInput.copy(bytes), getUncompressedSize(), dictionarySize, encoding);
+ }
+
+
+ @Override
+ public String toString() {
+ return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + getUncompressedSize() + ", encoding=" + encoding + "]";
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
new file mode 100644
index 0000000..3c6b012
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * one page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class Page {
+
+ private final int compressedSize;
+ private final int uncompressedSize;
+
+ Page(int compressedSize, int uncompressedSize) {
+ super();
+ this.compressedSize = compressedSize;
+ this.uncompressedSize = uncompressedSize;
+ }
+
+ public int getCompressedSize() {
+ return compressedSize;
+ }
+
+ /**
+ * @return the uncompressed size of the page when the bytes are compressed
+ */
+ public int getUncompressedSize() {
+ return uncompressedSize;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
new file mode 100644
index 0000000..3cfe624
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the readers for all the columns of the corresponding row group
+ *
+ * TODO: rename to RowGroup?
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageReadStore {
+
+ /**
+ *
+ * @param descriptor the descriptor of the column
+ * @return the page reader for that column
+ */
+ PageReader getPageReader(ColumnDescriptor descriptor);
+
+ /**
+ *
+ * @return the total number of rows in that row group
+ */
+ long getRowCount();
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
new file mode 100644
index 0000000..94c9cb7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+/**
+ * Reader for a sequence a page from a given column chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageReader {
+
+ /**
+ * @return the dictionary page in that chunk or null if none
+ */
+ DictionaryPage readDictionaryPage();
+
+ /**
+ * @return the total number of values in the column chunk
+ */
+ long getTotalValueCount();
+
+ /**
+ * @return the next page in that chunk or null if after the last page
+ */
+ DataPage readPage();
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
new file mode 100644
index 0000000..2de9db9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the writers for the columns in the corresponding row group
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageWriteStore {
+
+ /**
+ *
+ * @param path the descriptor for the column
+ * @return the corresponding page writer
+ */
+ PageWriter getPageWriter(ColumnDescriptor path);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
new file mode 100644
index 0000000..4ad7d9f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+
+/**
+ * a writer for all the pages of a given column chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface PageWriter {
+
+ /**
+ * writes a single page
+ * @param bytesInput the bytes for the page
+ * @param valueCount the number of values in that page
+ * @param statistics the statistics for that page
+ * @param rlEncoding repetition level encoding
+ * @param dlEncoding definition level encoding
+ * @param valuesEncoding values encoding
+ * @throws IOException
+ */
+ void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+
+ /**
+ * writes a single page in the new format
+ * @param rowCount the number of rows in this page
+ * @param nullCount the number of null values (out of valueCount)
+ * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
+ * @param repetitionLevels the repetition levels encoded in RLE without any size header
+ * @param definitionLevels the definition levels encoded in RLE without any size header
+ * @param dataEncoding the encoding for the data
+ * @param data the data encoded with dataEncoding
+ * @param statistics optional stats for this page
+ * @param metadata optional free form key values
+ * @throws IOException
+ */
+ void writePageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput data,
+ Statistics<?> statistics) throws IOException;
+
+ /**
+ * @return the current size used in the memory buffer for that column chunk
+ */
+ long getMemSize();
+
+ /**
+ * @return the allocated size for the buffer ( > getMemSize() )
+ */
+ long allocatedSize();
+
+ /**
+ * writes a dictionary page
+ * @param dictionaryPage the dictionary page containing the dictionary data
+ */
+ void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException;
+
+ /**
+ * @param prefix a prefix header to add at every line
+ * @return a string presenting a summary of how memory is used
+ */
+ String memUsageString(String prefix);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
new file mode 100644
index 0000000..b2d9b55
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.io.api.Binary;
+
+public class BinaryStatistics extends Statistics<Binary> {
+
+ private Binary max;
+ private Binary min;
+
+ @Override
+ public void updateStats(Binary value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ BinaryStatistics binaryStats = (BinaryStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(binaryStats.getMin(), binaryStats.getMax());
+ } else {
+ updateStats(binaryStats.getMin(), binaryStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = Binary.fromByteArray(maxBytes);
+ min = Binary.fromByteArray(minBytes);
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return max.getBytes();
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return min.getBytes();
+ }
+
+ @Override
+ public String toString() {
+ if (this.hasNonNullValue())
+ return String.format("min: %s, max: %s, num_nulls: %d", min.toStringUsingUTF8(), max.toStringUsingUTF8(), this.getNumNulls());
+ else if (!this.isEmpty())
+ return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(Binary min_value, Binary max_value) {
+ if (min.compareTo(min_value) > 0) { min = min_value; }
+ if (max.compareTo(max_value) < 0) { max = max_value; }
+ }
+
+ public void initializeStats(Binary min_value, Binary max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Binary genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Binary genericGetMax() {
+ return max;
+ }
+
+ public Binary getMax() {
+ return max;
+ }
+
+ public Binary getMin() {
+ return min;
+ }
+
+ public void setMinMax(Binary min, Binary max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
new file mode 100644
index 0000000..1d02c74
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import org.apache.parquet.bytes.BytesUtils;
+
+public class BooleanStatistics extends Statistics<Boolean> {
+
+ private boolean max;
+ private boolean min;
+
+ @Override
+ public void updateStats(boolean value) {
+ if (!this.hasNonNullValue()) {
+ initializeStats(value, value);
+ } else {
+ updateStats(value, value);
+ }
+ }
+
+ @Override
+ public void mergeStatisticsMinMax(Statistics stats) {
+ BooleanStatistics boolStats = (BooleanStatistics)stats;
+ if (!this.hasNonNullValue()) {
+ initializeStats(boolStats.getMin(), boolStats.getMax());
+ } else {
+ updateStats(boolStats.getMin(), boolStats.getMax());
+ }
+ }
+
+ @Override
+ public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
+ max = BytesUtils.bytesToBool(maxBytes);
+ min = BytesUtils.bytesToBool(minBytes);
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public byte[] getMaxBytes() {
+ return BytesUtils.booleanToBytes(max);
+ }
+
+ @Override
+ public byte[] getMinBytes() {
+ return BytesUtils.booleanToBytes(min);
+ }
+
+ @Override
+ public String toString() {
+ if (this.hasNonNullValue())
+ return String.format("min: %b, max: %b, num_nulls: %d", min, max, this.getNumNulls());
+ else if(!this.isEmpty())
+ return String.format("num_nulls: %d, min/max not defined", this.getNumNulls());
+ else
+ return "no stats for this column";
+ }
+
+ public void updateStats(boolean min_value, boolean max_value) {
+ if (min && !min_value) { min = min_value; }
+ if (!max && max_value) { max = max_value; }
+ }
+
+ public void initializeStats(boolean min_value, boolean max_value) {
+ min = min_value;
+ max = max_value;
+ this.markAsNotEmpty();
+ }
+
+ @Override
+ public Boolean genericGetMin() {
+ return min;
+ }
+
+ @Override
+ public Boolean genericGetMax() {
+ return max;
+ }
+
+ public boolean getMax() {
+ return max;
+ }
+
+ public boolean getMin() {
+ return min;
+ }
+
+ public void setMinMax(boolean min, boolean max) {
+ this.max = max;
+ this.min = min;
+ this.markAsNotEmpty();
+ }
+}