You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/12/07 11:07:32 UTC

[parquet-mr] branch master updated: PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new a6fde11  PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)
a6fde11 is described below

commit a6fde11fd11aeb19c4e23a83ee2922b9acec3251
Author: daijyc <jd...@pinterest.com>
AuthorDate: Mon Dec 7 03:07:21 2020 -0800

    PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)
    
    * PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat would produce wrong data
---
 .../mapred/DeprecatedParquetInputFormat.java       |   1 +
 .../parquet/hadoop/DeprecatedInputFormatTest.java  | 159 +++++++++++++++++++++
 2 files changed, 160 insertions(+)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index 6b69bc9..3afdc7e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -151,6 +151,7 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
       }
 
       if (firstRecord) { // key & value are already read.
+        value.set(valueContainer.get());
         firstRecord = false;
         return true;
       }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
index 92238de..58f395e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.parquet.hadoop;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -41,7 +46,16 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.schema.MessageTypeParser;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Set;
 
 import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
@@ -111,6 +125,151 @@ public class DeprecatedInputFormatTest {
     }
   }
 
+  // This is the RecordReader implementation simulate cascading 2 behavior:
+  // https://github.com/Cascading/cascading/blob/2.6/cascading-hadoop/
+  // src/main/shared/cascading/tap/hadoop/io/CombineFileRecordReaderWrapper.java
+  static class CombineFileRecordReaderWrapper<K, V> implements RecordReader<K, V>
+  {
+    private final RecordReader<K, V> delegate;
+
+    public CombineFileRecordReaderWrapper( CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx ) throws Exception
+    {
+      FileSplit fileSplit = new FileSplit(
+        split.getPath( idx ),
+        split.getOffset( idx ),
+        split.getLength( idx ),
+        split.getLocations()
+      );
+
+      delegate = new DeprecatedParquetInputFormat().getRecordReader( fileSplit, (JobConf) conf, reporter );
+    }
+
+    public boolean next( K key, V value ) throws IOException
+    {
+      return delegate.next( key, value );
+    }
+
+    public K createKey()
+    {
+      return delegate.createKey();
+    }
+
+    public V createValue()
+    {
+      return delegate.createValue();
+    }
+
+    public long getPos() throws IOException
+    {
+      return delegate.getPos();
+    }
+
+    public void close() throws IOException
+    {
+      delegate.close();
+    }
+
+    public float getProgress() throws IOException
+    {
+      return delegate.getProgress();
+    }
+  }
+
+  // This is the InputFormat implementation simulates cascading 2:
+  // https://github.com/Cascading/cascading/blob/2.6/cascading-hadoop/
+  // src/main/shared/cascading/tap/hadoop/Hfs.java#L773
+  static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
+  {
+    private Configuration conf;
+    public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
+    {
+      return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
+    }
+    @Override
+    public void setConf( Configuration conf )
+    {
+      this.conf = conf;
+    }
+    @Override
+    public Configuration getConf()
+    {
+      return conf;
+    }
+  }
+
+  class PartFileFilter implements FilenameFilter {
+    @Override
+    public boolean accept(File dir, String name) {
+      if (name.startsWith("part")) {
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private File createParquetFile(String content)
+    throws IOException, ClassNotFoundException, InterruptedException {
+    File inputFile = File.createTempFile("temp", null);
+    File outputFile = File.createTempFile("temp", null);
+    outputFile.delete();
+    PrintWriter pw = new PrintWriter(new FileWriter(inputFile));
+    pw.println(content);
+    pw.close();
+    writeJob = new Job(conf, "write");
+
+    TextInputFormat.addInputPath(writeJob, new Path(inputFile.toURI()));
+    writeJob.setInputFormatClass(TextInputFormat.class);
+    writeJob.setNumReduceTasks(0);
+    ExampleOutputFormat.setOutputPath(writeJob, new Path(outputFile.toURI()));
+    writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+    writeJob.setMapperClass(ReadMapper.class);
+    ExampleOutputFormat.setSchema(
+      writeJob,
+      MessageTypeParser.parseMessageType(
+        writeSchema));
+    writeJob.submit();
+    waitForJob(writeJob);
+    File partFile = outputFile.listFiles(new PartFileFilter())[0];
+    inputFile.delete();
+    return partFile;
+  }
+
+  @Test
+  public void testCombineParquetInputFormat() throws Exception {
+    File inputDir = File.createTempFile("temp", null);
+    inputDir.delete();
+    inputDir.mkdirs();
+    File parquetFile1 = createParquetFile("hello");
+    File parquetFile2 = createParquetFile("world");
+    Files.move(parquetFile1.toPath(), new File(inputDir, "1").toPath());
+    Files.move(parquetFile2.toPath(), new File(inputDir, "2").toPath());
+
+    File outputDir = File.createTempFile("temp", null);
+    outputDir.delete();
+    org.apache.hadoop.mapred.JobConf conf
+      = new org.apache.hadoop.mapred.JobConf(DeprecatedInputFormatTest.class);
+    conf.setInputFormat(CombinedInputFormat.class);
+    conf.setNumReduceTasks(0);
+    conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
+    conf.setMapperClass(DeprecatedWriteMapper.class);
+    org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, new Path(inputDir.toURI()));
+    org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(conf, new Path(outputDir.toURI()));
+    conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, GroupReadSupport.class.getCanonicalName());
+    JobClient.runJob(conf);
+    File partFile = outputDir.listFiles(new PartFileFilter())[0];
+    BufferedReader br = new BufferedReader(new FileReader(partFile));
+    String line;
+    Set<String> s = new HashSet<String>();
+    while ((line = br.readLine()) != null) {
+      s.add(line.split("\t")[1]);
+    }
+    assertEquals(s.size(), 2);
+    assertTrue(s.contains("hello"));
+    assertTrue(s.contains("world"));
+    FileUtils.deleteDirectory(inputDir);
+    FileUtils.deleteDirectory(outputDir);
+  }
+
   @Test
   public void testReadWriteWithCountDeprecated() throws Exception {
     runMapReduceJob(CompressionCodecName.GZIP);