You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/12/04 22:16:26 UTC
[3/3] incubator-parquet-mr git commit: PARQUET-117: implement the new
page format for Parquet 2.0
PARQUET-117: implement the new page format for Parquet 2.0
The new page format was defined some time ago:
https://github.com/Parquet/parquet-format/pull/64
https://github.com/Parquet/parquet-format/issues/44
The goals are the following:
- cut pages on record boundaries to facilitate skipping pages in predicate poush down
- read rl and dl independently of data
- optionally not compress data
Author: julien <ju...@twitter.com>
Closes #75 from julienledem/new_page_format and squashes the following commits:
fbbc23a [julien] make mvn install display output only if it fails
4189383 [julien] save output lines as travis cuts after 10000
44d3684 [julien] fix parquet-tools for new page format
0fb8c15 [julien] Merge branch 'master' into new_page_format
5880cbb [julien] Merge branch 'master' into new_page_format
6ee7303 [julien] make parquet.column package not semver compliant
42f6c9f [julien] add tests and fix bugs
266302b [julien] fix write path
4e76369 [julien] read path
050a487 [julien] fix compilation
e0e9d00 [julien] better ColumnWriterStore definition
ecf04ce [julien] remove unnecessary change
2bc4d01 [julien] first stab at write path for the new page format
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/ccc29e4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/ccc29e4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/ccc29e4d
Branch: refs/heads/master
Commit: ccc29e4dde24584118211f27c71bb01bacc39326
Parents: b5f6a3b
Author: julien <ju...@twitter.com>
Authored: Thu Dec 4 13:16:11 2014 -0800
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Thu Dec 4 13:16:11 2014 -0800
----------------------------------------------------------------------
.travis.yml | 4 +-
.../java/parquet/column/ColumnWriteStore.java | 22 ++
.../main/java/parquet/column/ColumnWriter.java | 12 -
.../java/parquet/column/ParquetProperties.java | 25 ++
.../parquet/column/impl/ColumnReaderImpl.java | 143 +++++++--
.../column/impl/ColumnWriteStoreImpl.java | 127 --------
.../parquet/column/impl/ColumnWriteStoreV1.java | 134 +++++++++
.../parquet/column/impl/ColumnWriteStoreV2.java | 163 ++++++++++
.../parquet/column/impl/ColumnWriterImpl.java | 275 -----------------
.../parquet/column/impl/ColumnWriterV1.java | 269 +++++++++++++++++
.../parquet/column/impl/ColumnWriterV2.java | 295 +++++++++++++++++++
.../main/java/parquet/column/page/DataPage.java | 50 ++++
.../java/parquet/column/page/DataPageV1.java | 80 +++++
.../java/parquet/column/page/DataPageV2.java | 138 +++++++++
.../parquet/column/page/DictionaryPage.java | 14 +-
.../src/main/java/parquet/column/page/Page.java | 136 +--------
.../java/parquet/column/page/PageReader.java | 2 +-
.../java/parquet/column/page/PageWriter.java | 34 ++-
.../parquet/column/values/ValuesWriter.java | 3 +-
.../rle/RunLengthBitPackingHybridDecoder.java | 7 +-
.../RunLengthBitPackingHybridValuesWriter.java | 10 +-
.../main/java/parquet/io/MessageColumnIO.java | 3 +
.../column/impl/TestColumnReaderImpl.java | 105 +++++++
.../java/parquet/column/mem/TestMemColumn.java | 27 +-
.../parquet/column/mem/TestMemPageStore.java | 4 +-
.../parquet/column/page/mem/MemPageReader.java | 10 +-
.../parquet/column/page/mem/MemPageStore.java | 4 +-
.../parquet/column/page/mem/MemPageWriter.java | 29 +-
.../src/test/java/parquet/io/PerfTest.java | 6 +-
.../src/test/java/parquet/io/TestColumnIO.java | 166 ++++++-----
.../src/test/java/parquet/io/TestFiltered.java | 4 +-
.../converter/ParquetMetadataConverter.java | 51 +++-
.../hadoop/ColumnChunkPageReadStore.java | 72 +++--
.../hadoop/ColumnChunkPageWriteStore.java | 89 +++---
.../hadoop/InternalParquetRecordWriter.java | 28 +-
.../java/parquet/hadoop/ParquetFileReader.java | 74 +++--
.../hadoop/TestColumnChunkPageWriteStore.java | 107 +++++++
.../parquet/hadoop/TestParquetFileWriter.java | 7 +-
.../hadoop/TestParquetWriterNewPage.java | 117 ++++++++
.../java/parquet/pig/GenerateIntTestFile.java | 142 ---------
.../src/test/java/parquet/pig/GenerateTPCH.java | 112 -------
.../java/parquet/pig/TupleConsumerPerfTest.java | 8 +-
.../parquet/thrift/TestParquetReadProtocol.java | 4 +-
.../java/parquet/tools/command/DumpCommand.java | 34 ++-
pom.xml | 2 +-
45 files changed, 2052 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a75ac3a..ae33a7b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,7 +5,7 @@ before_install:
- mkdir protobuf_install
- pushd protobuf_install
- wget http://protobuf.googlecode.com/files/protobuf-2.5.0.tar.gz
- - tar xzvf protobuf-2.5.0.tar.gz
+ - tar xzf protobuf-2.5.0.tar.gz
- cd protobuf-2.5.0
- ./configure
- make
@@ -27,5 +27,5 @@ env:
- HADOOP_PROFILE=default
- HADOOP_PROFILE=hadoop-2
-install: mvn install -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true
+install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || cat mvn_install.log
script: mvn test -P $HADOOP_PROFILE
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
index 9abff4d..4b8121f 100644
--- a/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
+++ b/parquet-column/src/main/java/parquet/column/ColumnWriteStore.java
@@ -33,4 +33,26 @@ public interface ColumnWriteStore {
*/
abstract public void flush();
+ /**
+ * called to notify of record boundaries
+ */
+ abstract public void endRecord();
+
+ /**
+ * used for information
+ * @return approximate size used in memory
+ */
+ abstract public long getAllocatedSize();
+
+ /**
+ * used to flush row groups to disk
+ * @return approximate size of the buffered encoded binary data
+ */
+ abstract public long getBufferedSize();
+
+ /**
+ * used for debugging pupose
+ * @return a formated string representing memory usage per column
+ */
+ abstract public String memUsageString();
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ColumnWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/parquet/column/ColumnWriter.java
index 1fd7bba..702fe26 100644
--- a/parquet-column/src/main/java/parquet/column/ColumnWriter.java
+++ b/parquet-column/src/main/java/parquet/column/ColumnWriter.java
@@ -15,7 +15,6 @@
*/
package parquet.column;
-import parquet.column.statistics.Statistics;
import parquet.io.api.Binary;
/**
@@ -81,16 +80,5 @@ public interface ColumnWriter {
*/
void writeNull(int repetitionLevel, int definitionLevel);
- /**
- * Flushes the underlying store. This should be called when there are no
- * remaining triplets to be written.
- */
- void flush();
-
- /**
- * used to decide when to write a page or row group
- * @return the number of bytes of memory used to buffer the current data
- */
- long getBufferedSizeInMemory();
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
index aea02ad..dc7774f 100644
--- a/parquet-column/src/main/java/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/parquet/column/ParquetProperties.java
@@ -4,6 +4,9 @@ import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static parquet.column.Encoding.PLAIN;
import static parquet.column.Encoding.PLAIN_DICTIONARY;
import static parquet.column.Encoding.RLE_DICTIONARY;
+import parquet.column.impl.ColumnWriteStoreV1;
+import parquet.column.impl.ColumnWriteStoreV2;
+import parquet.column.page.PageWriteStore;
import parquet.column.values.ValuesWriter;
import parquet.column.values.boundedint.DevNullValuesWriter;
import parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
@@ -20,6 +23,7 @@ import parquet.column.values.plain.BooleanPlainValuesWriter;
import parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
import parquet.column.values.plain.PlainValuesWriter;
import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import parquet.schema.MessageType;
/**
* This class represents all the configurable Parquet properties.
@@ -195,4 +199,25 @@ public class ParquetProperties {
public boolean isEnableDictionary() {
return enableDictionary;
}
+
+ public ColumnWriteStore newColumnWriteStore(
+ MessageType schema,
+ PageWriteStore pageStore, int pageSize,
+ int initialPageBufferSize) {
+ switch (writerVersion) {
+ case PARQUET_1_0:
+ return new ColumnWriteStoreV1(
+ pageStore,
+ pageSize, initialPageBufferSize, dictionaryPageSizeThreshold,
+ enableDictionary, writerVersion);
+ case PARQUET_2_0:
+ return new ColumnWriteStoreV2(
+ schema,
+ pageStore,
+ pageSize, initialPageBufferSize,
+ new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
+ default:
+ throw new IllegalArgumentException("unknown version " + writerVersion);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
index a58bfd9..dfbdc71 100644
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
@@ -18,18 +18,27 @@ package parquet.column.impl;
import static java.lang.String.format;
import static parquet.Log.DEBUG;
import static parquet.Preconditions.checkNotNull;
+import static parquet.column.ValuesType.DEFINITION_LEVEL;
+import static parquet.column.ValuesType.REPETITION_LEVEL;
+import static parquet.column.ValuesType.VALUES;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReader;
import parquet.column.Dictionary;
-import parquet.column.ValuesType;
+import parquet.column.Encoding;
+import parquet.column.page.DataPage;
+import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
import parquet.column.page.DictionaryPage;
-import parquet.column.page.Page;
import parquet.column.page.PageReader;
import parquet.column.values.ValuesReader;
+import parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import parquet.io.ParquetDecodingException;
import parquet.io.api.Binary;
import parquet.io.api.PrimitiveConverter;
@@ -95,7 +104,7 @@ class ColumnReaderImpl implements ColumnReader {
public long getLong() {
throw new UnsupportedOperationException();
}
-
+
/**
* @return current value
*/
@@ -123,8 +132,8 @@ class ColumnReaderImpl implements ColumnReader {
private final PageReader pageReader;
private final Dictionary dictionary;
- private ValuesReader repetitionLevelColumn;
- private ValuesReader definitionLevelColumn;
+ private IntIterator repetitionLevelColumn;
+ private IntIterator definitionLevelColumn;
protected ValuesReader dataColumn;
private int repetitionLevel;
@@ -478,8 +487,8 @@ class ColumnReaderImpl implements ColumnReader {
// TODO: change the logic around read() to not tie together reading from the 3 columns
private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.readInteger();
- definitionLevel = definitionLevelColumn.readInteger();
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
++readValues;
}
@@ -497,42 +506,91 @@ class ColumnReaderImpl implements ColumnReader {
private void readPage() {
if (DEBUG) LOG.debug("loading page");
- Page page = pageReader.readPage();
+ DataPage page = pageReader.readPage();
+ page.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
- this.repetitionLevelColumn = page.getRlEncoding().getValuesReader(path, ValuesType.REPETITION_LEVEL);
- this.definitionLevelColumn = page.getDlEncoding().getValuesReader(path, ValuesType.DEFINITION_LEVEL);
- if (page.getValueEncoding().usesDictionary()) {
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = readValues + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
throw new ParquetDecodingException(
- "could not read page " + page + " in col " + path + " as the dictionary was missing for encoding " + page.getValueEncoding());
+ "could not read page in col " + path + " as the dictionary was missing for encoding " + dataEncoding);
}
- this.dataColumn = page.getValueEncoding().getDictionaryBasedValuesReader(path, ValuesType.VALUES, dictionary);
+ this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(path, VALUES, dictionary);
} else {
- this.dataColumn = page.getValueEncoding().getValuesReader(path, ValuesType.VALUES);
+ this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
}
- if (page.getValueEncoding().usesDictionary() && converter.hasDictionarySupport()) {
+ if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
bindToDictionary(dictionary);
} else {
bind(path.getType());
}
- this.pageValueCount = page.getValueCount();
- this.endOfPageValueCount = readValues + pageValueCount;
+ try {
+ dataColumn.initFromPage(pageValueCount, bytes, offset);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page in col " + path, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
byte[] bytes = page.getBytes().toByteArray();
if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
- repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
- int next = repetitionLevelColumn.getNextOffset();
+ rlReader.initFromPage(pageValueCount, bytes, 0);
+ int next = rlReader.getNextOffset();
if (DEBUG) LOG.debug("reading definition levels at " + next);
- definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
- next = definitionLevelColumn.getNextOffset();
+ dlReader.initFromPage(pageValueCount, bytes, next);
+ next = dlReader.getNextOffset();
if (DEBUG) LOG.debug("reading data at " + next);
- dataColumn.initFromPage(pageValueCount, bytes, next);
+ initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
}
+ private void readPageV2(DataPageV2 page) {
+ this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
+ this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+ try {
+ if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read levels in page for col " + path, e);
+ }
+ }
+
private boolean isPageFullyConsumed() {
return readValues >= endOfPageValueCount;
}
@@ -556,4 +614,45 @@ class ColumnReaderImpl implements ColumnReader {
return totalValueCount;
}
+ static abstract class IntIterator {
+ abstract int nextInt();
+ }
+
+ static class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ super();
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ static class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ private static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
deleted file mode 100644
index 9d3b15c..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.column.impl;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriteStore;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-
-
-public class ColumnWriteStoreImpl implements ColumnWriteStore {
-
- private final Map<ColumnDescriptor, ColumnWriterImpl> columns = new TreeMap<ColumnDescriptor, ColumnWriterImpl>();
- private final PageWriteStore pageWriteStore;
- private final int pageSizeThreshold;
- private final int dictionaryPageSizeThreshold;
- private final boolean enableDictionary;
- private final int initialSizePerCol;
- private final WriterVersion writerVersion;
-
- public ColumnWriteStoreImpl(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
- super();
- this.pageWriteStore = pageWriteStore;
- this.pageSizeThreshold = pageSizeThreshold;
- this.initialSizePerCol = initialSizePerCol;
- this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
- this.enableDictionary = enableDictionary;
- this.writerVersion = writerVersion;
- }
-
- public ColumnWriter getColumnWriter(ColumnDescriptor path) {
- ColumnWriterImpl column = columns.get(path);
- if (column == null) {
- column = newMemColumn(path);
- columns.put(path, column);
- }
- return column;
- }
-
- public Set<ColumnDescriptor> getColumnDescriptors() {
- return columns.keySet();
- }
-
- private ColumnWriterImpl newMemColumn(ColumnDescriptor path) {
- PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- return new ColumnWriterImpl(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Entry<ColumnDescriptor, ColumnWriterImpl> entry : columns.entrySet()) {
- sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
- sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
- sb.append("\n");
- }
- return sb.toString();
- }
-
- public long allocatedSize() {
- Collection<ColumnWriterImpl> values = columns.values();
- long total = 0;
- for (ColumnWriterImpl memColumn : values) {
- total += memColumn.allocatedSize();
- }
- return total;
- }
-
- public long memSize() {
- Collection<ColumnWriterImpl> values = columns.values();
- long total = 0;
- for (ColumnWriterImpl memColumn : values) {
- total += memColumn.getBufferedSizeInMemory();
- }
- return total;
- }
-
- public long maxColMemSize() {
- Collection<ColumnWriterImpl> values = columns.values();
- long max = 0;
- for (ColumnWriterImpl memColumn : values) {
- max = Math.max(max, memColumn.getBufferedSizeInMemory());
- }
- return max;
- }
-
- @Override
- public void flush() {
- Collection<ColumnWriterImpl> values = columns.values();
- for (ColumnWriterImpl memColumn : values) {
- memColumn.flush();
- }
- }
-
- public String memUsageString() {
- StringBuilder b = new StringBuilder("Store {\n");
- Collection<ColumnWriterImpl> values = columns.values();
- for (ColumnWriterImpl memColumn : values) {
- b.append(memColumn.memUsageString(" "));
- }
- b.append("}\n");
- return b.toString();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
new file mode 100644
index 0000000..884c665
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV1.java
@@ -0,0 +1,134 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+
+public class ColumnWriteStoreV1 implements ColumnWriteStore {
+
+ private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
+ private final PageWriteStore pageWriteStore;
+ private final int pageSizeThreshold;
+ private final int dictionaryPageSizeThreshold;
+ private final boolean enableDictionary;
+ private final int initialSizePerCol;
+ private final WriterVersion writerVersion;
+
+ public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int initialSizePerCol, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
+ super();
+ this.pageWriteStore = pageWriteStore;
+ this.pageSizeThreshold = pageSizeThreshold;
+ this.initialSizePerCol = initialSizePerCol;
+ this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
+ this.enableDictionary = enableDictionary;
+ this.writerVersion = writerVersion;
+ }
+
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ ColumnWriterV1 column = columns.get(path);
+ if (column == null) {
+ column = newMemColumn(path);
+ columns.put(path, column);
+ }
+ return column;
+ }
+
+ public Set<ColumnDescriptor> getColumnDescriptors() {
+ return columns.keySet();
+ }
+
+ private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, initialSizePerCol, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
+ sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+ sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long total = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ total += memColumn.allocatedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long total = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ total += memColumn.getBufferedSizeInMemory();
+ }
+ return total;
+ }
+
+ @Override
+ public String memUsageString() {
+ StringBuilder b = new StringBuilder("Store {\n");
+ Collection<ColumnWriterV1> values = columns.values();
+ for (ColumnWriterV1 memColumn : values) {
+ b.append(memColumn.memUsageString(" "));
+ }
+ b.append("}\n");
+ return b.toString();
+ }
+
+ public long maxColMemSize() {
+ Collection<ColumnWriterV1> values = columns.values();
+ long max = 0;
+ for (ColumnWriterV1 memColumn : values) {
+ max = Math.max(max, memColumn.getBufferedSizeInMemory());
+ }
+ return max;
+ }
+
+ @Override
+ public void flush() {
+ Collection<ColumnWriterV1> values = columns.values();
+ for (ColumnWriterV1 memColumn : values) {
+ memColumn.flush();
+ }
+ }
+
+ @Override
+ public void endRecord() {
+ // V1 does not take record boundaries into account
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
new file mode 100644
index 0000000..ba6edf3
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriteStoreV2.java
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriteStore;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties;
+import parquet.column.page.PageWriteStore;
+import parquet.column.page.PageWriter;
+import parquet.schema.MessageType;
+
+public class ColumnWriteStoreV2 implements ColumnWriteStore {
+
+ // will wait for at least that many records before checking again
+ private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+ private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+ // will flush even if size bellow the threshold by this much to facilitate page alignment
+ private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
+
+ private final Map<ColumnDescriptor, ColumnWriterV2> columns;
+ private final Collection<ColumnWriterV2> writers;
+ private long rowCount;
+ private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ private final long thresholdTolerance;
+
+ private int pageSizeThreshold;
+
+ public ColumnWriteStoreV2(
+ MessageType schema,
+ PageWriteStore pageWriteStore,
+ int pageSizeThreshold, int initialSizePerCol,
+ ParquetProperties parquetProps) {
+ super();
+ this.pageSizeThreshold = pageSizeThreshold;
+ this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
+ Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ mcolumns.put(path, new ColumnWriterV2(path, pageWriter, initialSizePerCol, parquetProps));
+ }
+ this.columns = unmodifiableMap(mcolumns);
+ this.writers = this.columns.values();
+ }
+
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ return columns.get(path);
+ }
+
+ public Set<ColumnDescriptor> getColumnDescriptors() {
+ return columns.keySet();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Entry<ColumnDescriptor, ColumnWriterV2> entry : columns.entrySet()) {
+ sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
+ sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public long getAllocatedSize() {
+ long total = 0;
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ total += memColumn.allocatedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public long getBufferedSize() {
+ long total = 0;
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ total += memColumn.getTotalBufferedSize();
+ }
+ return total;
+ }
+
+ @Override
+ public void flush() {
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ long rows = rowCount - memColumn.getRowsWrittenSoFar();
+ if (rows > 0) {
+ memColumn.writePage(rowCount);
+ }
+ memColumn.finalizeColumnChunk();
+ }
+ }
+
+ public String memUsageString() {
+ StringBuilder b = new StringBuilder("Store {\n");
+ for (ColumnWriterV2 memColumn : columns.values()) {
+ b.append(memColumn.memUsageString(" "));
+ }
+ b.append("}\n");
+ return b.toString();
+ }
+
+ @Override
+ public void endRecord() {
+ ++ rowCount;
+ if (rowCount >= rowCountForNextSizeCheck) {
+ sizeCheck();
+ }
+ }
+
+ private void sizeCheck() {
+ long minRecordToWait = Long.MAX_VALUE;
+ for (ColumnWriterV2 writer : writers) {
+ long usedMem = writer.getCurrentPageBufferedSize();
+ long rows = rowCount - writer.getRowsWrittenSoFar();
+ long remainingMem = pageSizeThreshold - usedMem;
+ if (remainingMem <= thresholdTolerance) {
+ writer.writePage(rowCount);
+ remainingMem = pageSizeThreshold;
+ }
+ long rowsToFillPage =
+ usedMem == 0 ?
+ MAXIMUM_RECORD_COUNT_FOR_CHECK
+ : (long)((float)rows) / usedMem * remainingMem;
+ if (rowsToFillPage < minRecordToWait) {
+ minRecordToWait = rowsToFillPage;
+ }
+ }
+ if (minRecordToWait == Long.MAX_VALUE) {
+ minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ }
+ // will check again halfway
+ rowCountForNextSizeCheck = rowCount +
+ min(
+ max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
+ MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
deleted file mode 100644
index 628b848..0000000
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterImpl.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.column.impl;
-
-import static parquet.bytes.BytesInput.concat;
-
-import java.io.IOException;
-
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.ColumnWriter;
-import parquet.column.ParquetProperties;
-import parquet.column.ParquetProperties.WriterVersion;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.column.values.ValuesWriter;
-import parquet.io.ParquetEncodingException;
-import parquet.io.api.Binary;
-
-/**
- * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
- *
- * @author Julien Le Dem
- *
- */
-final class ColumnWriterImpl implements ColumnWriter {
- private static final Log LOG = Log.getLog(ColumnWriterImpl.class);
- private static final boolean DEBUG = Log.DEBUG;
- private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
-
- private final ColumnDescriptor path;
- private final PageWriter pageWriter;
- private final long pageSizeThreshold;
- private ValuesWriter repetitionLevelColumn;
- private ValuesWriter definitionLevelColumn;
- private ValuesWriter dataColumn;
- private int valueCount;
- private int valueCountForNextSizeCheck;
-
- private Statistics statistics;
-
- public ColumnWriterImpl(
- ColumnDescriptor path,
- PageWriter pageWriter,
- int pageSizeThreshold,
- int initialSizePerCol,
- int dictionaryPageSizeThreshold,
- boolean enableDictionary,
- WriterVersion writerVersion) {
- this.path = path;
- this.pageWriter = pageWriter;
- this.pageSizeThreshold = pageSizeThreshold;
- // initial check of memory usage. So that we have enough data to make an initial prediction
- this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
- resetStatistics();
-
- ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
- this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
- this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
- this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
- }
-
- private void initStatistics() {
- this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- private void log(Object value, int r, int d) {
- LOG.debug(path + " " + value + " r:" + r + " d:" + d);
- }
-
- private void resetStatistics() {
- this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
- }
-
- /**
- * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
- *
- * We measure the memory used when we reach the mid point toward our estimated count.
- * We then update the estimate and flush the page if we reached the threshold.
- *
- * That way we check the memory size log2(n) times.
- *
- */
- private void accountForValueWritten() {
- ++ valueCount;
- if (valueCount > valueCountForNextSizeCheck) {
- // not checking the memory used for every value
- long memSize = repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize();
- if (memSize > pageSizeThreshold) {
- // we will write the current page and check again the size at the predicted middle of next page
- valueCountForNextSizeCheck = valueCount / 2;
- writePage();
- } else {
- // not reached the threshold, will check again midway
- valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
- }
- }
- }
-
- private void updateStatisticsNumNulls() {
- statistics.incrementNumNulls();
- }
-
- private void updateStatistics(int value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(long value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(float value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(double value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(Binary value) {
- statistics.updateStats(value);
- }
-
- private void updateStatistics(boolean value) {
- statistics.updateStats(value);
- }
-
- private void writePage() {
- if (DEBUG) LOG.debug("write page");
- try {
- pageWriter.writePage(
- concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
- valueCount,
- statistics,
- repetitionLevelColumn.getEncoding(),
- definitionLevelColumn.getEncoding(),
- dataColumn.getEncoding());
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write page for " + path, e);
- }
- repetitionLevelColumn.reset();
- definitionLevelColumn.reset();
- dataColumn.reset();
- valueCount = 0;
- resetStatistics();
- }
-
- @Override
- public void writeNull(int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(null, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- updateStatisticsNumNulls();
- accountForValueWritten();
- }
-
- @Override
- public void write(double value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeDouble(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(float value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeFloat(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(Binary value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeBytes(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(boolean value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeBoolean(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(int value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeInteger(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void write(long value, int repetitionLevel, int definitionLevel) {
- if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevelColumn.writeInteger(repetitionLevel);
- definitionLevelColumn.writeInteger(definitionLevel);
- dataColumn.writeLong(value);
- updateStatistics(value);
- accountForValueWritten();
- }
-
- @Override
- public void flush() {
- if (valueCount > 0) {
- writePage();
- }
- final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
- if (dictionaryPage != null) {
- if (DEBUG) LOG.debug("write dictionary");
- try {
- pageWriter.writeDictionaryPage(dictionaryPage);
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write dictionary page for " + path, e);
- }
- dataColumn.resetDictionary();
- }
- }
-
- @Override
- public long getBufferedSizeInMemory() {
- return repetitionLevelColumn.getBufferedSize()
- + definitionLevelColumn.getBufferedSize()
- + dataColumn.getBufferedSize()
- + pageWriter.getMemSize();
- }
-
- public long allocatedSize() {
- return repetitionLevelColumn.getAllocatedSize()
- + definitionLevelColumn.getAllocatedSize()
- + dataColumn.getAllocatedSize()
- + pageWriter.allocatedSize();
- }
-
- public String memUsageString(String indent) {
- StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
- b.append(repetitionLevelColumn.memUsageString(indent + " r:")).append("\n");
- b.append(definitionLevelColumn.memUsageString(indent + " d:")).append("\n");
- b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
- b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
- b.append(indent).append(String.format(" total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
- b.append(indent).append("}\n");
- return b.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
new file mode 100644
index 0000000..8b72207
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV1.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static parquet.bytes.BytesInput.concat;
+
+import java.io.IOException;
+
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriter;
+import parquet.column.ParquetProperties;
+import parquet.column.ParquetProperties.WriterVersion;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.Statistics;
+import parquet.column.values.ValuesWriter;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV1 implements ColumnWriter {
+ private static final Log LOG = Log.getLog(ColumnWriterV1.class);
+ private static final boolean DEBUG = Log.DEBUG;
+ private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
+
+ private final ColumnDescriptor path;
+ private final PageWriter pageWriter;
+ private final long pageSizeThreshold;
+ private ValuesWriter repetitionLevelColumn;
+ private ValuesWriter definitionLevelColumn;
+ private ValuesWriter dataColumn;
+ private int valueCount;
+ private int valueCountForNextSizeCheck;
+
+ private Statistics statistics;
+
+ public ColumnWriterV1(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ int pageSizeThreshold,
+ int initialSizePerCol,
+ int dictionaryPageSizeThreshold,
+ boolean enableDictionary,
+ WriterVersion writerVersion) {
+ this.path = path;
+ this.pageWriter = pageWriter;
+ this.pageSizeThreshold = pageSizeThreshold;
+ // initial check of memory usage. So that we have enough data to make an initial prediction
+ this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
+ resetStatistics();
+
+ ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
+ this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
+ this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
+ this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
+ }
+
+ private void log(Object value, int r, int d) {
+ LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+ }
+
+ private void resetStatistics() {
+ this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+ }
+
+ /**
+ * Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
+ *
+ * We measure the memory used when we reach the mid point toward our estimated count.
+ * We then update the estimate and flush the page if we reached the threshold.
+ *
+ * That way we check the memory size log2(n) times.
+ *
+ */
+ private void accountForValueWritten() {
+ ++ valueCount;
+ if (valueCount > valueCountForNextSizeCheck) {
+ // not checking the memory used for every value
+ long memSize = repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize();
+ if (memSize > pageSizeThreshold) {
+ // we will write the current page and check again the size at the predicted middle of next page
+ valueCountForNextSizeCheck = valueCount / 2;
+ writePage();
+ } else {
+ // not reached the threshold, will check again midway
+ valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
+ }
+ }
+ }
+
+ private void updateStatisticsNumNulls() {
+ statistics.incrementNumNulls();
+ }
+
+ private void updateStatistics(int value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(long value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(float value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(double value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(Binary value) {
+ statistics.updateStats(value);
+ }
+
+ private void updateStatistics(boolean value) {
+ statistics.updateStats(value);
+ }
+
+ private void writePage() {
+ if (DEBUG) LOG.debug("write page");
+ try {
+ pageWriter.writePage(
+ concat(repetitionLevelColumn.getBytes(), definitionLevelColumn.getBytes(), dataColumn.getBytes()),
+ valueCount,
+ statistics,
+ repetitionLevelColumn.getEncoding(),
+ definitionLevelColumn.getEncoding(),
+ dataColumn.getEncoding());
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page for " + path, e);
+ }
+ repetitionLevelColumn.reset();
+ definitionLevelColumn.reset();
+ dataColumn.reset();
+ valueCount = 0;
+ resetStatistics();
+ }
+
+ @Override
+ public void writeNull(int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(null, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ updateStatisticsNumNulls();
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(double value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeDouble(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeFloat(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(Binary value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeBytes(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(boolean value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeBoolean(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(int value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeInteger(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ @Override
+ public void write(long value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevelColumn.writeInteger(repetitionLevel);
+ definitionLevelColumn.writeInteger(definitionLevel);
+ dataColumn.writeLong(value);
+ updateStatistics(value);
+ accountForValueWritten();
+ }
+
+ public void flush() {
+ if (valueCount > 0) {
+ writePage();
+ }
+ final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+ if (dictionaryPage != null) {
+ if (DEBUG) LOG.debug("write dictionary");
+ try {
+ pageWriter.writeDictionaryPage(dictionaryPage);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+ }
+ dataColumn.resetDictionary();
+ }
+ }
+
+ public long getBufferedSizeInMemory() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize()
+ + pageWriter.getMemSize();
+ }
+
+ public long allocatedSize() {
+ return repetitionLevelColumn.getAllocatedSize()
+ + definitionLevelColumn.getAllocatedSize()
+ + dataColumn.getAllocatedSize()
+ + pageWriter.allocatedSize();
+ }
+
+ public String memUsageString(String indent) {
+ StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+ b.append(repetitionLevelColumn.memUsageString(indent + " r:")).append("\n");
+ b.append(definitionLevelColumn.memUsageString(indent + " d:")).append("\n");
+ b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
+ b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
+ b.append(indent).append(String.format(" total: %,d/%,d", getBufferedSizeInMemory(), allocatedSize())).append("\n");
+ b.append(indent).append("}\n");
+ return b.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
new file mode 100644
index 0000000..65f0366
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnWriterV2.java
@@ -0,0 +1,295 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.impl;
+
+import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
+import java.io.IOException;
+
+import parquet.Ints;
+import parquet.Log;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnWriter;
+import parquet.column.Encoding;
+import parquet.column.ParquetProperties;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.PageWriter;
+import parquet.column.statistics.Statistics;
+import parquet.column.values.ValuesWriter;
+import parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+
+/**
+ * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
+ *
+ * @author Julien Le Dem
+ *
+ */
+final class ColumnWriterV2 implements ColumnWriter {
+ private static final Log LOG = Log.getLog(ColumnWriterV2.class);
+ private static final boolean DEBUG = Log.DEBUG;
+
+ private final ColumnDescriptor path;
+ private final PageWriter pageWriter;
+ private RunLengthBitPackingHybridEncoder repetitionLevelColumn;
+ private RunLengthBitPackingHybridEncoder definitionLevelColumn;
+ private ValuesWriter dataColumn;
+ private int valueCount;
+
+ private Statistics<?> statistics;
+ private long rowsWrittenSoFar = 0;
+
+ public ColumnWriterV2(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ int initialSizePerCol,
+ ParquetProperties parquetProps) {
+ this.path = path;
+ this.pageWriter = pageWriter;
+ resetStatistics();
+ this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), initialSizePerCol);
+ this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), initialSizePerCol);
+ this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
+ }
+
+ private void log(Object value, int r, int d) {
+ LOG.debug(path + " " + value + " r:" + r + " d:" + d);
+ }
+
+ private void resetStatistics() {
+ this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
+ }
+
+ private void definitionLevel(int definitionLevel) {
+ try {
+ definitionLevelColumn.writeInt(definitionLevel);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("illegal definition level " + definitionLevel + " for column " + path, e);
+ }
+ }
+
+ private void repetitionLevel(int repetitionLevel) {
+ try {
+ repetitionLevelColumn.writeInt(repetitionLevel);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("illegal repetition level " + repetitionLevel + " for column " + path, e);
+ }
+ }
+
+ /**
+ * writes the current null value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void writeNull(int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(null, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ statistics.incrementNumNulls();
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(double value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeDouble(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeFloat(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(Binary value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBytes(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(boolean value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBoolean(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(int value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeInteger(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ public void write(long value, int repetitionLevel, int definitionLevel) {
+ if (DEBUG) log(value, repetitionLevel, definitionLevel);
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeLong(value);
+ statistics.updateStats(value);
+ ++ valueCount;
+ }
+
+ /**
+ * Finalizes the Column chunk. Possibly adding extra pages if needed (dictionary, ...)
+ * Is called right after writePage
+ */
+ public void finalizeColumnChunk() {
+ final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+ if (dictionaryPage != null) {
+ if (DEBUG) LOG.debug("write dictionary");
+ try {
+ pageWriter.writeDictionaryPage(dictionaryPage);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write dictionary page for " + path, e);
+ }
+ dataColumn.resetDictionary();
+ }
+ }
+
+ /**
+ * used to decide when to write a page
+ * @return the number of bytes of memory used to buffer the current data
+ */
+ public long getCurrentPageBufferedSize() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize();
+ }
+
+ /**
+ * used to decide when to write a page or row group
+ * @return the number of bytes of memory used to buffer the current data and the previously written pages
+ */
+ public long getTotalBufferedSize() {
+ return repetitionLevelColumn.getBufferedSize()
+ + definitionLevelColumn.getBufferedSize()
+ + dataColumn.getBufferedSize()
+ + pageWriter.getMemSize();
+ }
+
+ /**
+ * @return actual memory used
+ */
+ public long allocatedSize() {
+ return repetitionLevelColumn.getAllocatedSize()
+ + definitionLevelColumn.getAllocatedSize()
+ + dataColumn.getAllocatedSize()
+ + pageWriter.allocatedSize();
+ }
+
+ /**
+ * @param prefix a prefix to format lines
+ * @return a formatted string showing how memory is used
+ */
+ public String memUsageString(String indent) {
+ StringBuilder b = new StringBuilder(indent).append(path).append(" {\n");
+ b.append(indent).append(" r:").append(repetitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+ b.append(indent).append(" d:").append(definitionLevelColumn.getAllocatedSize()).append(" bytes\n");
+ b.append(dataColumn.memUsageString(indent + " data:")).append("\n");
+ b.append(pageWriter.memUsageString(indent + " pages:")).append("\n");
+ b.append(indent).append(String.format(" total: %,d/%,d", getTotalBufferedSize(), allocatedSize())).append("\n");
+ b.append(indent).append("}\n");
+ return b.toString();
+ }
+
+ public long getRowsWrittenSoFar() {
+ return this.rowsWrittenSoFar;
+ }
+
+ /**
+ * writes the current data to a new page in the page store
+ * @param rowCount how many rows have been written so far
+ */
+ public void writePage(long rowCount) {
+ int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
+ this.rowsWrittenSoFar = rowCount;
+ if (DEBUG) LOG.debug("write page");
+ try {
+ // TODO: rework this API. Those must be called *in that order*
+ BytesInput bytes = dataColumn.getBytes();
+ Encoding encoding = dataColumn.getEncoding();
+ pageWriter.writePageV2(
+ pageRowCount,
+ Ints.checkedCast(statistics.getNumNulls()),
+ valueCount,
+ path.getMaxRepetitionLevel() == 0 ? BytesInput.empty() : repetitionLevelColumn.toBytes(),
+ path.getMaxDefinitionLevel() == 0 ? BytesInput.empty() : definitionLevelColumn.toBytes(),
+ encoding,
+ bytes,
+ statistics
+ );
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page for " + path, e);
+ }
+ repetitionLevelColumn.reset();
+ definitionLevelColumn.reset();
+ dataColumn.reset();
+ valueCount = 0;
+ resetStatistics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPage.java b/parquet-column/src/main/java/parquet/column/page/DataPage.java
new file mode 100644
index 0000000..3a1afa0
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPage.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.column.page;
+
+/**
+ * one data page in a chunk
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class DataPage extends Page {
+
+ private final int valueCount;
+
+ DataPage(int compressedSize, int uncompressedSize, int valueCount) {
+ super(compressedSize, uncompressedSize);
+ this.valueCount = valueCount;
+ }
+
+ /**
+ * @return the number of values in that page
+ */
+ public int getValueCount() {
+ return valueCount;
+ }
+
+ public abstract <T> T accept(Visitor<T> visitor);
+
+ public static interface Visitor<T> {
+
+ T visit(DataPageV1 dataPageV1);
+
+ T visit(DataPageV2 dataPageV2);
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV1.java b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
new file mode 100644
index 0000000..a53eed8
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPageV1.java
@@ -0,0 +1,80 @@
+package parquet.column.page;
+
+import parquet.Ints;
+import parquet.bytes.BytesInput;
+import parquet.column.Encoding;
+import parquet.column.statistics.Statistics;
+
+public class DataPageV1 extends DataPage {
+
+ private final BytesInput bytes;
+ private final Statistics<?> statistics;
+ private final Encoding rlEncoding;
+ private final Encoding dlEncoding;
+ private final Encoding valuesEncoding;
+
+ /**
+ * @param bytes the bytes for this page
+ * @param valueCount count of values in this page
+ * @param uncompressedSize the uncompressed size of the page
+ * @param statistics of the page's values (max, min, num_null)
+ * @param rlEncoding the repetition level encoding for this page
+ * @param dlEncoding the definition level encoding for this page
+ * @param valuesEncoding the values encoding for this page
+ * @param dlEncoding
+ */
+ public DataPageV1(BytesInput bytes, int valueCount, int uncompressedSize, Statistics<?> stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
+ super(Ints.checkedCast(bytes.size()), uncompressedSize, valueCount);
+ this.bytes = bytes;
+ this.statistics = stats;
+ this.rlEncoding = rlEncoding;
+ this.dlEncoding = dlEncoding;
+ this.valuesEncoding = valuesEncoding;
+ }
+
+ /**
+ * @return the bytes for the page
+ */
+ public BytesInput getBytes() {
+ return bytes;
+ }
+
+ /**
+ *
+ * @return the statistics for this page (max, min, num_nulls)
+ */
+ public Statistics<?> getStatistics() {
+ return statistics;
+ }
+
+ /**
+ * @return the definition level encoding for this page
+ */
+ public Encoding getDlEncoding() {
+ return dlEncoding;
+ }
+
+ /**
+ * @return the repetition level encoding for this page
+ */
+ public Encoding getRlEncoding() {
+ return rlEncoding;
+ }
+
+ /**
+ * @return the values encoding for this page
+ */
+ public Encoding getValueEncoding() {
+ return valuesEncoding;
+ }
+
+ @Override
+ public String toString() {
+ return "Page [bytes.size=" + bytes.size() + ", valueCount=" + getValueCount() + ", uncompressedSize=" + getUncompressedSize() + "]";
+ }
+
+ @Override
+ public <T> T accept(Visitor<T> visitor) {
+ return visitor.visit(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
new file mode 100644
index 0000000..bc9a873
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/column/page/DataPageV2.java
@@ -0,0 +1,138 @@
+package parquet.column.page;
+
+import parquet.Ints;
+import parquet.bytes.BytesInput;
+import parquet.column.Encoding;
+import parquet.column.statistics.Statistics;
+
+public class DataPageV2 extends DataPage {
+
+ /**
+ * @param rowCount
+ * @param nullCount
+ * @param valueCount
+ * @param repetitionLevels RLE encoded repetition levels
+ * @param definitionLevels RLE encoded definition levels
+ * @param dataEncoding encoding for the data
+ * @param data data encoded with dataEncoding
+ * @param statistics optional statistics for this page
+ * @return an uncompressed page
+ */
+ public static DataPageV2 uncompressed(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ Statistics<?> statistics) {
+ return new DataPageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()),
+ statistics,
+ false);
+ }
+
+ /**
+ * @param rowCount
+ * @param nullCount
+ * @param valueCount
+ * @param repetitionLevels RLE encoded repetition levels
+ * @param definitionLevels RLE encoded definition levels
+ * @param dataEncoding encoding for the data
+ * @param data data encoded with dataEncoding and compressed
+ * @param uncompressedSize total size uncompressed (rl + dl + data)
+ * @param statistics optional statistics for this page
+ * @return a compressed page
+ */
+ public static DataPageV2 compressed(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ int uncompressedSize,
+ Statistics<?> statistics) {
+ return new DataPageV2(
+ rowCount, nullCount, valueCount,
+ repetitionLevels, definitionLevels,
+ dataEncoding, data,
+ uncompressedSize,
+ statistics,
+ true);
+ }
+
+ private final int rowCount;
+ private final int nullCount;
+ private final BytesInput repetitionLevels;
+ private final BytesInput definitionLevels;
+ private final Encoding dataEncoding;
+ private final BytesInput data;
+ private final Statistics<?> statistics;
+ private final boolean isCompressed;
+
+ public DataPageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ int uncompressedSize,
+ Statistics<?> statistics,
+ boolean isCompressed) {
+ super(Ints.checkedCast(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, valueCount);
+ this.rowCount = rowCount;
+ this.nullCount = nullCount;
+ this.repetitionLevels = repetitionLevels;
+ this.definitionLevels = definitionLevels;
+ this.dataEncoding = dataEncoding;
+ this.data = data;
+ this.statistics = statistics;
+ this.isCompressed = isCompressed;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ public int getNullCount() {
+ return nullCount;
+ }
+
+ public BytesInput getRepetitionLevels() {
+ return repetitionLevels;
+ }
+
+ public BytesInput getDefinitionLevels() {
+ return definitionLevels;
+ }
+
+ public Encoding getDataEncoding() {
+ return dataEncoding;
+ }
+
+ public BytesInput getData() {
+ return data;
+ }
+
+ public Statistics<?> getStatistics() {
+ return statistics;
+ }
+
+ public boolean isCompressed() {
+ return isCompressed;
+ }
+
+ @Override
+ public <T> T accept(Visitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String toString() {
+ return "Page V2 ["
+ + "dl size=" + definitionLevels.size() + ", "
+ + "rl size=" + repetitionLevels.size() + ", "
+ + "data size=" + data.size() + ", "
+ + "data enc=" + dataEncoding + ", "
+ + "valueCount=" + getValueCount() + ", "
+ + "rowCount=" + getRowCount() + ", "
+ + "is compressed=" + isCompressed + ", "
+ + "uncompressedSize=" + getUncompressedSize() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java b/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
index 78e88a2..9ae1037 100644
--- a/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
+++ b/parquet-column/src/main/java/parquet/column/page/DictionaryPage.java
@@ -19,6 +19,7 @@ import static parquet.Preconditions.checkNotNull;
import java.io.IOException;
+import parquet.Ints;
import parquet.bytes.BytesInput;
import parquet.column.Encoding;
@@ -28,10 +29,9 @@ import parquet.column.Encoding;
* @author Julien Le Dem
*
*/
-public class DictionaryPage {
+public class DictionaryPage extends Page {
private final BytesInput bytes;
- private final int uncompressedSize;
private final int dictionarySize;
private final Encoding encoding;
@@ -53,8 +53,8 @@ public class DictionaryPage {
* @param encoding the encoding used
*/
public DictionaryPage(BytesInput bytes, int uncompressedSize, int dictionarySize, Encoding encoding) {
+ super(Ints.checkedCast(bytes.size()), uncompressedSize);
this.bytes = checkNotNull(bytes, "bytes");
- this.uncompressedSize = uncompressedSize;
this.dictionarySize = dictionarySize;
this.encoding = checkNotNull(encoding, "encoding");
}
@@ -63,10 +63,6 @@ public class DictionaryPage {
return bytes;
}
- public int getUncompressedSize() {
- return uncompressedSize;
- }
-
public int getDictionarySize() {
return dictionarySize;
}
@@ -76,13 +72,13 @@ public class DictionaryPage {
}
public DictionaryPage copy() throws IOException {
- return new DictionaryPage(BytesInput.copy(bytes), uncompressedSize, dictionarySize, encoding);
+ return new DictionaryPage(BytesInput.copy(bytes), getUncompressedSize(), dictionarySize, encoding);
}
@Override
public String toString() {
- return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + uncompressedSize + ", encoding=" + encoding + "]";
+ return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize=" + getUncompressedSize() + ", encoding=" + encoding + "]";
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/Page.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/Page.java b/parquet-column/src/main/java/parquet/column/page/Page.java
index 192b2a9..e5ab636 100644
--- a/parquet-column/src/main/java/parquet/column/page/Page.java
+++ b/parquet-column/src/main/java/parquet/column/page/Page.java
@@ -1,145 +1,31 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package parquet.column.page;
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
-import parquet.column.statistics.BooleanStatistics;
-
/**
* one page in a chunk
*
* @author Julien Le Dem
*
*/
-public class Page {
- private static final boolean DEBUG = Log.DEBUG;
- private static final Log LOG = Log.getLog(Page.class);
-
- private static int nextId = 0;
+abstract public class Page {
- private final BytesInput bytes;
- private final int valueCount;
+ private final int compressedSize;
private final int uncompressedSize;
- private final Statistics statistics;
- private final Encoding rlEncoding;
- private final Encoding dlEncoding;
- private final Encoding valuesEncoding;
- private final int id;
- @Deprecated
- /**
- * @param bytes the bytes for this page
- * @param valueCount count of values in this page
- * @param uncompressedSize the uncompressed size of the page
- * @param rlEncoding the repetition level encoding for this page
- * @param dlEncoding the definition level encoding for this page
- * @param valuesEncoding the values encoding for this page
- * @param dlEncoding
- */
- public Page(BytesInput bytes, int valueCount, int uncompressedSize, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
- this.bytes = bytes;
- this.valueCount = valueCount;
+ Page(int compressedSize, int uncompressedSize) {
+ super();
+ this.compressedSize = compressedSize;
this.uncompressedSize = uncompressedSize;
- this.statistics = new BooleanStatistics();
- this.rlEncoding = rlEncoding;
- this.dlEncoding = dlEncoding;
- this.valuesEncoding = valuesEncoding;
- this.id = nextId ++;
- if (DEBUG) LOG.debug("new Page #"+id+" : " + bytes.size() + " bytes and " + valueCount + " records");
- }
- /**
- * @param bytes the bytes for this page
- * @param valueCount count of values in this page
- * @param uncompressedSize the uncompressed size of the page
- * @param statistics of the page's values (max, min, num_null)
- * @param rlEncoding the repetition level encoding for this page
- * @param dlEncoding the definition level encoding for this page
- * @param valuesEncoding the values encoding for this page
- * @param dlEncoding
- */
- public Page(BytesInput bytes, int valueCount, int uncompressedSize, Statistics stats, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) {
- this.bytes = bytes;
- this.valueCount = valueCount;
- this.uncompressedSize = uncompressedSize;
- this.statistics = stats;
- this.rlEncoding = rlEncoding;
- this.dlEncoding = dlEncoding;
- this.valuesEncoding = valuesEncoding;
- this.id = nextId ++;
- if (DEBUG) LOG.debug("new Page #"+id+" : " + bytes.size() + " bytes and " + valueCount + " records");
- }
- /**
- *
- * @return the bytes for the page
- */
- public BytesInput getBytes() {
- return bytes;
}
- /**
- *
- * @return the number of values in that page
- */
- public int getValueCount() {
- return valueCount;
+ public int getCompressedSize() {
+ return compressedSize;
}
- /**
- *
- * @return the uncompressed size of the page when the bytes are compressed
- */
+ /**
+ * @return the uncompressed size of the page when the bytes are compressed
+ */
public int getUncompressedSize() {
return uncompressedSize;
}
- /**
- *
- * @return the statistics for this page (max, min, num_nulls)
- */
- public Statistics getStatistics() {
- return statistics;
- }
-
- /**
- * @return the definition level encoding for this page
- */
- public Encoding getDlEncoding() {
- return dlEncoding;
- }
-
- /**
- * @return the repetition level encoding for this page
- */
- public Encoding getRlEncoding() {
- return rlEncoding;
- }
-
- /**
- * @return the values encoding for this page
- */
- public Encoding getValueEncoding() {
- return valuesEncoding;
- }
-
- @Override
- public String toString() {
- return "Page [id: " + id + ", bytes.size=" + bytes.size() + ", valueCount=" + valueCount + ", uncompressedSize=" + uncompressedSize + "]";
- }
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/PageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/PageReader.java b/parquet-column/src/main/java/parquet/column/page/PageReader.java
index f2ef171..d115037 100644
--- a/parquet-column/src/main/java/parquet/column/page/PageReader.java
+++ b/parquet-column/src/main/java/parquet/column/page/PageReader.java
@@ -36,5 +36,5 @@ public interface PageReader {
/**
* @return the next page in that chunk or null if after the last page
*/
- Page readPage();
+ DataPage readPage();
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/page/PageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/parquet/column/page/PageWriter.java
index 5197e52..1d6aa52 100644
--- a/parquet-column/src/main/java/parquet/column/page/PageWriter.java
+++ b/parquet-column/src/main/java/parquet/column/page/PageWriter.java
@@ -29,29 +29,37 @@ import parquet.column.statistics.Statistics;
*/
public interface PageWriter {
- @Deprecated
/**
* writes a single page
* @param bytesInput the bytes for the page
* @param valueCount the number of values in that page
+ * @param statistics the statistics for that page
* @param rlEncoding repetition level encoding
* @param dlEncoding definition level encoding
* @param valuesEncoding values encoding
* @throws IOException
*/
- void writePage(BytesInput bytesInput, int valueCount, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+ void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
/**
- * writes a single page
- * @param bytesInput the bytes for the page
- * @param valueCount the number of values in that page
- * @param statistics the statistics for that page
- * @param rlEncoding repetition level encoding
- * @param dlEncoding definition level encoding
- * @param valuesEncoding values encoding
+ * writes a single page in the new format
+ * @param rowCount the number of rows in this page
+ * @param nullCount the number of null values (out of valueCount)
+ * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
+ * @param repetitionLevels the repetition levels encoded in RLE without any size header
+ * @param definitionLevels the definition levels encoded in RLE without any size header
+ * @param dataEncoding the encoding for the data
+ * @param data the data encoded with dataEncoding
+ * @param statistics optional stats for this page
+ * @param metadata optional free form key values
* @throws IOException
*/
- void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
+ void writePageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput data,
+ Statistics<?> statistics) throws IOException;
/**
* @return the current size used in the memory buffer for that column chunk
@@ -69,6 +77,10 @@ public interface PageWriter {
*/
void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException;
- public abstract String memUsageString(String prefix);
+ /**
+ * @param prefix a prefix header to add at every line
+ * @return a string presenting a summary of how memory is used
+ */
+ String memUsageString(String prefix);
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ccc29e4d/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
index a0c949c..1b674f5 100644
--- a/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
+++ b/parquet-column/src/main/java/parquet/column/values/ValuesWriter.java
@@ -66,9 +66,8 @@ public abstract class ValuesWriter {
}
/**
- *
+ * ( > {@link #getBufferedMemorySize} )
* @return the allocated size of the buffer
- * ( > {@link #getBufferedMemorySize()() )
*/
abstract public long getAllocatedSize();