You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/10/19 00:45:37 UTC
parquet-mr git commit: PARQUET-751: Add setRequestedSchema to
ParquetFileReader.
Repository: parquet-mr
Updated Branches:
refs/heads/master 1058b7d98 -> ece4b70cc
PARQUET-751: Add setRequestedSchema to ParquetFileReader.
This fixes a bug introduced by dictionary filters, which reused an
existing file reader to avoid opening multiple input streams. Before
that commit, a new file reader was opened and passed the projection
columns from the read context. The fix is to set the requested schema on
the file reader instead of creating a new instance.
This also adds a test to ensure that column projection works to catch
bugs like this in the future.
Author: Ryan Blue <bl...@apache.org>
Closes #379 from rdblue/PARQUET-751-fix-column-projection and squashes the following commits:
7ea0c16 [Ryan Blue] PARQUET-751: Fix column projection test.
1da507e [Ryan Blue] PARQUET-751: Add setRequestedSchema to ParquetFileReader.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/ece4b70c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/ece4b70c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/ece4b70c
Branch: refs/heads/master
Commit: ece4b70cce24b89483236b4cff079c10597d680a
Parents: 1058b7d
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Oct 18 17:45:32 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Oct 18 17:45:32 2016 -0700
----------------------------------------------------------------------
.../hadoop/InternalParquetRecordReader.java | 1 +
.../parquet/hadoop/ParquetFileReader.java | 8 +
.../hadoop/TestInputFormatColumnProjection.java | 180 +++++++++++++++++++
3 files changed, 189 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index d43fd7d..85b6691 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -179,6 +179,7 @@ class InternalParquetRecordReader<T> {
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
this.filterRecords = configuration.getBoolean(
RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
+ reader.setRequestedSchema(requestedSchema);
LOG.info("RecordReader initialized will read a total of " + total + " records.");
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 9e95535..4af26d0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -95,6 +95,7 @@ import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
/**
@@ -674,6 +675,13 @@ public class ParquetFileReader implements Closeable {
return blocks;
}
+ public void setRequestedSchema(MessageType projection) {
+ paths.clear();
+ for (ColumnDescriptor col : projection.getColumns()) {
+ paths.put(ColumnPath.get(col.getPath()), col);
+ }
+ }
+
public void appendTo(ParquetFileWriter writer) throws IOException {
writer.appendRowGroups(f, blocks, true);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ece4b70c/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
new file mode 100644
index 0000000..a6d2732
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleInputFormat;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.lang.Thread.sleep;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+public class TestInputFormatColumnProjection {
+ public static final String FILE_CONTENT = "" +
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+ public static MessageType PARQUET_TYPE = Types.buildMessage()
+ .required(BINARY).as(UTF8).named("uuid")
+ .required(BINARY).as(UTF8).named("char")
+ .named("FormatTestObject");
+
+ public static class Writer extends Mapper<LongWritable, Text, Void, Group> {
+ public static final SimpleGroupFactory GROUP_FACTORY = new SimpleGroupFactory(PARQUET_TYPE);
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ // writes each character of the line with a UUID
+ String line = value.toString();
+ for (int i = 0; i < line.length(); i += 1) {
+ Group group = GROUP_FACTORY.newGroup();
+ group.add(0, Binary.fromString(UUID.randomUUID().toString()));
+ group.add(1, Binary.fromString(line.substring(i, i+1)));
+ context.write(null, group);
+ }
+ }
+ }
+
+ public static class Reader extends Mapper<Void, Group, LongWritable, Text> {
+
+ public static Counter bytesReadCounter = null;
+ public static void setBytesReadCounter(Counter bytesRead) {
+ bytesReadCounter = bytesRead;
+ }
+
+ @Override
+ protected void map(Void key, Group value, Context context)
+ throws IOException, InterruptedException {
+ // Do nothing. The test uses Hadoop FS counters for verification.
+ setBytesReadCounter(ContextUtil.getCounter(
+ context, "parquet", "bytesread"));
+ }
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testProjectionSize() throws Exception {
+ Assume.assumeTrue( // only run this test for Hadoop 2
+ org.apache.hadoop.mapreduce.JobContext.class.isInterface());
+
+ File inputFile = temp.newFile();
+ FileOutputStream out = new FileOutputStream(inputFile);
+ out.write(FILE_CONTENT.getBytes("UTF-8"));
+ out.close();
+
+ File tempFolder = temp.newFolder();
+ tempFolder.delete();
+ Path tempPath = new Path(tempFolder.toURI());
+
+ File outputFolder = temp.newFile();
+ outputFolder.delete();
+
+ Configuration conf = new Configuration();
+ // set the projection schema
+ conf.set("parquet.read.schema", Types.buildMessage()
+ .required(BINARY).as(UTF8).named("char")
+ .named("FormatTestObject").toString());
+
+ // disable summary metadata, it isn't needed
+ conf.set("parquet.enable.summary-metadata", "false");
+ conf.set("parquet.example.schema", PARQUET_TYPE.toString());
+
+ {
+ Job writeJob = new Job(conf, "write");
+ writeJob.setInputFormatClass(TextInputFormat.class);
+ TextInputFormat.addInputPath(writeJob, new Path(inputFile.toString()));
+
+ writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+ writeJob.setMapperClass(Writer.class);
+ writeJob.setNumReduceTasks(0); // write directly to Parquet without reduce
+ ParquetOutputFormat.setBlockSize(writeJob, 10240);
+ ParquetOutputFormat.setPageSize(writeJob, 512);
+ ParquetOutputFormat.setDictionaryPageSize(writeJob, 1024);
+ ParquetOutputFormat.setEnableDictionary(writeJob, true);
+ ParquetOutputFormat.setMaxPaddingSize(writeJob, 1023); // always pad
+ ParquetOutputFormat.setOutputPath(writeJob, tempPath);
+
+ waitForJob(writeJob);
+ }
+
+ long bytesWritten = 0;
+ FileSystem fs = FileSystem.getLocal(conf);
+ for (FileStatus file : fs.listStatus(tempPath)) {
+ bytesWritten += file.getLen();
+ }
+
+ long bytesRead;
+ {
+ Job readJob = new Job(conf, "read");
+ readJob.setInputFormatClass(ExampleInputFormat.class);
+ TextInputFormat.addInputPath(readJob, tempPath);
+
+ readJob.setOutputFormatClass(TextOutputFormat.class);
+ readJob.setMapperClass(Reader.class);
+ readJob.setNumReduceTasks(0); // no reduce phase
+ TextOutputFormat.setOutputPath(readJob, new Path(outputFolder.toString()));
+
+ waitForJob(readJob);
+
+ bytesRead = Reader.bytesReadCounter.getValue();
+ }
+
+ Assert.assertTrue("Should read less than 10% of the input file size",
+ bytesRead < (bytesWritten / 10));
+ }
+
+ private void waitForJob(Job job) throws Exception {
+ job.submit();
+ while (!job.isComplete()) {
+ sleep(100);
+ }
+ if (!job.isSuccessful()) {
+ throw new RuntimeException("job failed " + job.getJobName());
+ }
+ }
+}