You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/04 21:35:43 UTC

incubator-parquet-mr git commit: PARQUET-188: Change column ordering to match the field order.

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master c82f70376 -> 36a02dc54


PARQUET-188: Change column ordering to match the field order.

This was the behavior before the V2 pages were added.

Author: Ryan Blue <bl...@apache.org>

Closes #129 from rdblue/PARQUET-188-fix-column-metadata-order and squashes the following commits:

3c9fa5d [Ryan Blue] PARQUET-188: Change column ordering to match the field order.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/36a02dc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/36a02dc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/36a02dc5

Branch: refs/heads/master
Commit: 36a02dc549f32433d7329444455dbb1be2e67f20
Parents: c82f703
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Mar 4 12:35:40 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Mar 4 12:35:40 2015 -0800

----------------------------------------------------------------------
 .../hadoop/ColumnChunkPageWriteStore.java       |  6 +-
 .../hadoop/TestColumnChunkPageWriteStore.java   | 74 ++++++++++++++++++--
 2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/36a02dc5/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
index b665089..b3cdd65 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -36,6 +36,7 @@ import parquet.column.page.DictionaryPage;
 import parquet.column.page.PageWriteStore;
 import parquet.column.page.PageWriter;
 import parquet.column.statistics.Statistics;
+import parquet.format.ColumnChunk;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.io.ParquetEncodingException;
@@ -203,8 +204,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
   }
 
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+  private final MessageType schema;
 
   public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) {
+    this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
       writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSize));
     }
@@ -216,7 +219,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
   }
 
   public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    for (ColumnChunkPageWriter pageWriter : writers.values()) {
+    for (ColumnDescriptor path : schema.getColumns()) {
+      ColumnChunkPageWriter pageWriter = writers.get(path);
       pageWriter.writeToFileWriter(writer);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/36a02dc5/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 636ee04..b1ec02e 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -19,8 +19,20 @@
 package parquet.hadoop;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static parquet.column.Encoding.PLAIN;
+import static parquet.column.Encoding.RLE;
 import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static parquet.schema.OriginalType.UTF8;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -30,8 +42,13 @@ import java.util.HashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.Before;
 import org.junit.Test;
 
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.mockito.verification.VerificationMode;
 import parquet.bytes.BytesInput;
 import parquet.bytes.LittleEndianDataInputStream;
 import parquet.column.ColumnDescriptor;
@@ -46,12 +63,23 @@ import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Types;
 
 public class TestColumnChunkPageWriteStore {
 
+  private int pageSize = 1024;
+  private int initialSize = 1024;
+  private Configuration conf;
+
+  @Before
+  public void initConfiguration() {
+    this.conf = new Configuration();
+  }
+
   @Test
   public void test() throws Exception {
-    Configuration conf = new Configuration();
     Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
     Path root = file.getParent();
     FileSystem fs = file.getFileSystem(conf);
@@ -59,12 +87,9 @@ public class TestColumnChunkPageWriteStore {
       fs.delete(root, true);
     }
     fs.mkdirs(root);
-    CodecFactory f = new CodecFactory(conf);
-    int pageSize = 1024;
-    int initialSize = 1024;
     MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
     ColumnDescriptor col = schema.getColumns().get(0);
-    Encoding dataEncoding = Encoding.PLAIN;
+    Encoding dataEncoding = PLAIN;
     int valueCount = 10;
     int d = 1;
     int r = 2;
@@ -75,14 +100,13 @@ public class TestColumnChunkPageWriteStore {
     BytesInput data = BytesInput.fromInt(v);
     int rowCount = 5;
     int nullCount = 1;
-    CompressionCodecName codec = GZIP;
 
     {
       ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
       writer.start();
       writer.startBlock(rowCount);
       {
-        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(f.getCompressor(codec, pageSize ), schema , initialSize);
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
         PageWriter pageWriter = store.getPageWriter(col);
         pageWriter.writePageV2(
             rowCount, nullCount, valueCount,
@@ -122,4 +146,40 @@ public class TestColumnChunkPageWriteStore {
     return i;
   }
 
+  @Test
+  public void testColumnOrderV1() throws IOException {
+    ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
+    InOrder inOrder = inOrder(mockFileWriter);
+    MessageType schema = Types.buildMessage()
+        .required(BINARY).as(UTF8).named("a_string")
+        .required(INT32).named("an_int")
+        .required(INT64).named("a_long")
+        .required(FLOAT).named("a_float")
+        .required(DOUBLE).named("a_double")
+        .named("order_test");
+
+    BytesInput fakeData = BytesInput.fromInt(34);
+    int fakeCount = 3;
+    BinaryStatistics fakeStats = new BinaryStatistics();
+
+    ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
+        compressor(UNCOMPRESSED), schema, initialSize);
+
+    for (ColumnDescriptor col : schema.getColumns()) {
+      PageWriter pageWriter = store.getPageWriter(col);
+      pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
+    }
+
+    // flush to the mock writer
+    store.flushToFileWriter(mockFileWriter);
+
+    for (ColumnDescriptor col : schema.getColumns()) {
+      inOrder.verify(mockFileWriter).startColumn(
+          eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
+    }
+  }
+
+  private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
+    return new CodecFactory(conf).getCompressor(codec, pageSize);
+  }
 }