You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/04 21:35:43 UTC
incubator-parquet-mr git commit: PARQUET-188: Change column ordering
to match the field order.
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master c82f70376 -> 36a02dc54
PARQUET-188: Change column ordering to match the field order.
This was the behavior before the V2 pages were added.
Author: Ryan Blue <bl...@apache.org>
Closes #129 from rdblue/PARQUET-188-fix-column-metadata-order and squashes the following commits:
3c9fa5d [Ryan Blue] PARQUET-188: Change column ordering to match the field order.
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/36a02dc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/36a02dc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/36a02dc5
Branch: refs/heads/master
Commit: 36a02dc549f32433d7329444455dbb1be2e67f20
Parents: c82f703
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Mar 4 12:35:40 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Mar 4 12:35:40 2015 -0800
----------------------------------------------------------------------
.../hadoop/ColumnChunkPageWriteStore.java | 6 +-
.../hadoop/TestColumnChunkPageWriteStore.java | 74 ++++++++++++++++++--
2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/36a02dc5/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
index b665089..b3cdd65 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -36,6 +36,7 @@ import parquet.column.page.DictionaryPage;
import parquet.column.page.PageWriteStore;
import parquet.column.page.PageWriter;
import parquet.column.statistics.Statistics;
+import parquet.format.ColumnChunk;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactory.BytesCompressor;
import parquet.io.ParquetEncodingException;
@@ -203,8 +204,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+ private final MessageType schema;
public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
+ this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize));
}
@@ -216,7 +219,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- for (ColumnChunkPageWriter pageWriter : writers.values()) {
+ for (ColumnDescriptor path : schema.getColumns()) {
+ ColumnChunkPageWriter pageWriter = writers.get(path);
pageWriter.writeToFileWriter(writer);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/36a02dc5/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 636ee04..b1ec02e 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -19,8 +19,20 @@
package parquet.hadoop;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.RLE;
import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static parquet.schema.OriginalType.UTF8;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -30,8 +42,13 @@ import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.mockito.verification.VerificationMode;
import parquet.bytes.BytesInput;
import parquet.bytes.LittleEndianDataInputStream;
import parquet.column.ColumnDescriptor;
@@ -46,12 +63,23 @@ import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Types;
public class TestColumnChunkPageWriteStore {
+ private int pageSize = 1024;
+ private int initialSize = 1024;
+ private Configuration conf;
+
+ @Before
+ public void initConfiguration() {
+ this.conf = new Configuration();
+ }
+
@Test
public void test() throws Exception {
- Configuration conf = new Configuration();
Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
Path root = file.getParent();
FileSystem fs = file.getFileSystem(conf);
@@ -59,12 +87,9 @@ public class TestColumnChunkPageWriteStore {
fs.delete(root, true);
}
fs.mkdirs(root);
- CodecFactory f = new CodecFactory(conf);
- int pageSize = 1024;
- int initialSize = 1024;
MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
ColumnDescriptor col = schema.getColumns().get(0);
- Encoding dataEncoding = Encoding.PLAIN;
+ Encoding dataEncoding = PLAIN;
int valueCount = 10;
int d = 1;
int r = 2;
@@ -75,14 +100,13 @@ public class TestColumnChunkPageWriteStore {
BytesInput data = BytesInput.fromInt(v);
int rowCount = 5;
int nullCount = 1;
- CompressionCodecName codec = GZIP;
{
ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
writer.start();
writer.startBlock(rowCount);
{
- ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize);
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePageV2(
rowCount, nullCount, valueCount,
@@ -122,4 +146,40 @@ public class TestColumnChunkPageWriteStore {
return i;
}
+ @Test
+ public void testColumnOrderV1() throws IOException {
+ ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
+ InOrder inOrder = inOrder(mockFileWriter);
+ MessageType schema = Types.buildMessage()
+ .required(BINARY).as(UTF8).named("a_string")
+ .required(INT32).named("an_int")
+ .required(INT64).named("a_long")
+ .required(FLOAT).named("a_float")
+ .required(DOUBLE).named("a_double")
+ .named("order_test");
+
+ BytesInput fakeData = BytesInput.fromInt(34);
+ int fakeCount = 3;
+ BinaryStatistics fakeStats = new BinaryStatistics();
+
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
+ compressor(UNCOMPRESSED), schema, initialSize);
+
+ for (ColumnDescriptor col : schema.getColumns()) {
+ PageWriter pageWriter = store.getPageWriter(col);
+ pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
+ }
+
+ // flush to the mock writer
+ store.flushToFileWriter(mockFileWriter);
+
+ for (ColumnDescriptor col : schema.getColumns()) {
+ inOrder.verify(mockFileWriter).startColumn(
+ eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+ }
+ }
+
+ private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
+ return new CodecFactory(conf).getCompressor(codec, pageSize);
+ }
}