You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/19 00:45:37 UTC

parquet-mr git commit: PARQUET-751: Add setRequestedSchema to ParquetFileReader.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 1058b7d98 -> ece4b70cc


PARQUET-751: Add setRequestedSchema to ParquetFileReader.

This fixes a bug introduced by dictionary filters, which reused an
existing file reader to avoid opening multiple input streams. Before
that commit, a new file reader was opened and passed the projection
columns from the read context. The fix is to set the requested schema on
the file reader instead of creating a new instance.

This also adds a test to ensure that column projection works to catch
bugs like this in the future.

Author: Ryan Blue <bl...@apache.org>

Closes #379 from rdblue/PARQUET-751-fix-column-projection and squashes the following commits:

7ea0c16 [Ryan Blue] PARQUET-751: Fix column projection test.
1da507e [Ryan Blue] PARQUET-751: Add setRequestedSchema to ParquetFileReader.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/ece4b70c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/ece4b70c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/ece4b70c

Branch: refs/heads/master
Commit: ece4b70cce24b89483236b4cff079c10597d680a
Parents: 1058b7d
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Oct 18 17:45:32 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Oct 18 17:45:32 2016 -0700

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordReader.java     |   1 +
 .../parquet/hadoop/ParquetFileReader.java       |   8 +
 .../hadoop/TestInputFormatColumnProjection.java | 180 +++++++++++++++++++
 3 files changed, 189 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index d43fd7d..85b6691 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -179,6 +179,7 @@ class InternalParquetRecordReader<T> {
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
     this.filterRecords = configuration.getBoolean(
         RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
+    reader.setRequestedSchema(requestedSchema);
     LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 9e95535..4af26d0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -95,6 +95,7 @@ import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 
 /**
@@ -674,6 +675,13 @@ public class ParquetFileReader implements Closeable {
     return blocks;
   }
 
+  public void setRequestedSchema(MessageType projection) {
+    paths.clear();
+    for (ColumnDescriptor col : projection.getColumns()) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+  }
+
   public void appendTo(ParquetFileWriter writer) throws IOException {
     writer.appendRowGroups(f, blocks, true);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
new file mode 100644
index 0000000..a6d2732
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
@@ -0,0 +1,180 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleInputFormat;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.lang.Thread.sleep;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+public class TestInputFormatColumnProjection {
+  public static final String FILE_CONTENT = "" +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+  public static MessageType PARQUET_TYPE = Types.buildMessage()
+      .required(BINARY).as(UTF8).named("uuid")
+      .required(BINARY).as(UTF8).named("char")
+      .named("FormatTestObject");
+
+  public static class Writer extends Mapper<LongWritable, Text, Void, Group> {
+    public static final SimpleGroupFactory GROUP_FACTORY = new SimpleGroupFactory(PARQUET_TYPE);
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      // writes each character of the line with a UUID
+      String line = value.toString();
+      for (int i = 0; i < line.length(); i += 1) {
+        Group group = GROUP_FACTORY.newGroup();
+        group.add(0, Binary.fromString(UUID.randomUUID().toString()));
+        group.add(1, Binary.fromString(line.substring(i, i+1)));
+        context.write(null, group);
+      }
+    }
+  }
+
+  public static class Reader extends Mapper<Void, Group, LongWritable, Text> {
+
+    public static Counter bytesReadCounter = null;
+    public static void setBytesReadCounter(Counter bytesRead) {
+      bytesReadCounter = bytesRead;
+    }
+
+    @Override
+    protected void map(Void key, Group value, Context context)
+        throws IOException, InterruptedException {
+      // Do nothing. The test uses Hadoop FS counters for verification.
+      setBytesReadCounter(ContextUtil.getCounter(
+          context, "parquet", "bytesread"));
+    }
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testProjectionSize() throws Exception {
+    Assume.assumeTrue( // only run this test for Hadoop 2
+        org.apache.hadoop.mapreduce.JobContext.class.isInterface());
+
+    File inputFile = temp.newFile();
+    FileOutputStream out = new FileOutputStream(inputFile);
+    out.write(FILE_CONTENT.getBytes("UTF-8"));
+    out.close();
+
+    File tempFolder = temp.newFolder();
+    tempFolder.delete();
+    Path tempPath = new Path(tempFolder.toURI());
+
+    File outputFolder = temp.newFile();
+    outputFolder.delete();
+
+    Configuration conf = new Configuration();
+    // set the projection schema
+    conf.set("parquet.read.schema", Types.buildMessage()
+        .required(BINARY).as(UTF8).named("char")
+        .named("FormatTestObject").toString());
+
+    // disable summary metadata, it isn't needed
+    conf.set("parquet.enable.summary-metadata", "false");
+    conf.set("parquet.example.schema", PARQUET_TYPE.toString());
+
+    {
+      Job writeJob = new Job(conf, "write");
+      writeJob.setInputFormatClass(TextInputFormat.class);
+      TextInputFormat.addInputPath(writeJob, new Path(inputFile.toString()));
+
+      writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+      writeJob.setMapperClass(Writer.class);
+      writeJob.setNumReduceTasks(0); // write directly to Parquet without reduce
+      ParquetOutputFormat.setBlockSize(writeJob, 10240);
+      ParquetOutputFormat.setPageSize(writeJob, 512);
+      ParquetOutputFormat.setDictionaryPageSize(writeJob, 1024);
+      ParquetOutputFormat.setEnableDictionary(writeJob, true);
+      ParquetOutputFormat.setMaxPaddingSize(writeJob, 1023); // always pad
+      ParquetOutputFormat.setOutputPath(writeJob, tempPath);
+
+      waitForJob(writeJob);
+    }
+
+    long bytesWritten = 0;
+    FileSystem fs = FileSystem.getLocal(conf);
+    for (FileStatus file : fs.listStatus(tempPath)) {
+      bytesWritten += file.getLen();
+    }
+
+    long bytesRead;
+    {
+      Job readJob = new Job(conf, "read");
+      readJob.setInputFormatClass(ExampleInputFormat.class);
+      TextInputFormat.addInputPath(readJob, tempPath);
+
+      readJob.setOutputFormatClass(TextOutputFormat.class);
+      readJob.setMapperClass(Reader.class);
+      readJob.setNumReduceTasks(0); // no reduce phase
+      TextOutputFormat.setOutputPath(readJob, new Path(outputFolder.toString()));
+
+      waitForJob(readJob);
+
+      bytesRead = Reader.bytesReadCounter.getValue();
+    }
+
+    Assert.assertTrue("Should read less than 10% of the input file size",
+        bytesRead < (bytesWritten / 10));
+  }
+
+  private void waitForJob(Job job) throws Exception {
+    job.submit();
+    while (!job.isComplete()) {
+      sleep(100);
+    }
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+}