You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/12/07 11:07:32 UTC
[parquet-mr] branch master updated: PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new a6fde11 PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)
a6fde11 is described below
commit a6fde11fd11aeb19c4e23a83ee2922b9acec3251
Author: daijyc <jd...@pinterest.com>
AuthorDate: Mon Dec 7 03:07:21 2020 -0800
PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat … (#844)
* PARQUET-1947: DeprecatedParquetInputFormat in CombineFileInputFormat would produce wrong data
---
.../mapred/DeprecatedParquetInputFormat.java | 1 +
.../parquet/hadoop/DeprecatedInputFormatTest.java | 159 +++++++++++++++++++++
2 files changed, 160 insertions(+)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index 6b69bc9..3afdc7e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -151,6 +151,7 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
}
if (firstRecord) { // key & value are already read.
+ value.set(valueContainer.get());
firstRecord = false;
return true;
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
index 92238de..58f395e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java
@@ -18,12 +18,17 @@
*/
package org.apache.parquet.hadoop;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
+import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -41,7 +46,16 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.schema.MessageTypeParser;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.HashSet;
+import java.util.Set;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
@@ -111,6 +125,151 @@ public class DeprecatedInputFormatTest {
}
}
+ // This is the RecordReader implementation simulate cascading 2 behavior:
+ // https://github.com/Cascading/cascading/blob/2.6/cascading-hadoop/
+ // src/main/shared/cascading/tap/hadoop/io/CombineFileRecordReaderWrapper.java
+ static class CombineFileRecordReaderWrapper<K, V> implements RecordReader<K, V>
+ {
+ private final RecordReader<K, V> delegate;
+
+ public CombineFileRecordReaderWrapper( CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx ) throws Exception
+ {
+ FileSplit fileSplit = new FileSplit(
+ split.getPath( idx ),
+ split.getOffset( idx ),
+ split.getLength( idx ),
+ split.getLocations()
+ );
+
+ delegate = new DeprecatedParquetInputFormat().getRecordReader( fileSplit, (JobConf) conf, reporter );
+ }
+
+ public boolean next( K key, V value ) throws IOException
+ {
+ return delegate.next( key, value );
+ }
+
+ public K createKey()
+ {
+ return delegate.createKey();
+ }
+
+ public V createValue()
+ {
+ return delegate.createValue();
+ }
+
+ public long getPos() throws IOException
+ {
+ return delegate.getPos();
+ }
+
+ public void close() throws IOException
+ {
+ delegate.close();
+ }
+
+ public float getProgress() throws IOException
+ {
+ return delegate.getProgress();
+ }
+ }
+
+ // This is the InputFormat implementation simulates cascading 2:
+ // https://github.com/Cascading/cascading/blob/2.6/cascading-hadoop/
+ // src/main/shared/cascading/tap/hadoop/Hfs.java#L773
+ static class CombinedInputFormat extends CombineFileInputFormat implements Configurable
+ {
+ private Configuration conf;
+ public RecordReader getRecordReader( InputSplit split, JobConf job, Reporter reporter ) throws IOException
+ {
+ return new CombineFileRecordReader( job, (CombineFileSplit) split, reporter, CombineFileRecordReaderWrapper.class );
+ }
+ @Override
+ public void setConf( Configuration conf )
+ {
+ this.conf = conf;
+ }
+ @Override
+ public Configuration getConf()
+ {
+ return conf;
+ }
+ }
+
+ class PartFileFilter implements FilenameFilter {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private File createParquetFile(String content)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ File inputFile = File.createTempFile("temp", null);
+ File outputFile = File.createTempFile("temp", null);
+ outputFile.delete();
+ PrintWriter pw = new PrintWriter(new FileWriter(inputFile));
+ pw.println(content);
+ pw.close();
+ writeJob = new Job(conf, "write");
+
+ TextInputFormat.addInputPath(writeJob, new Path(inputFile.toURI()));
+ writeJob.setInputFormatClass(TextInputFormat.class);
+ writeJob.setNumReduceTasks(0);
+ ExampleOutputFormat.setOutputPath(writeJob, new Path(outputFile.toURI()));
+ writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+ writeJob.setMapperClass(ReadMapper.class);
+ ExampleOutputFormat.setSchema(
+ writeJob,
+ MessageTypeParser.parseMessageType(
+ writeSchema));
+ writeJob.submit();
+ waitForJob(writeJob);
+ File partFile = outputFile.listFiles(new PartFileFilter())[0];
+ inputFile.delete();
+ return partFile;
+ }
+
+ @Test
+ public void testCombineParquetInputFormat() throws Exception {
+ File inputDir = File.createTempFile("temp", null);
+ inputDir.delete();
+ inputDir.mkdirs();
+ File parquetFile1 = createParquetFile("hello");
+ File parquetFile2 = createParquetFile("world");
+ Files.move(parquetFile1.toPath(), new File(inputDir, "1").toPath());
+ Files.move(parquetFile2.toPath(), new File(inputDir, "2").toPath());
+
+ File outputDir = File.createTempFile("temp", null);
+ outputDir.delete();
+ org.apache.hadoop.mapred.JobConf conf
+ = new org.apache.hadoop.mapred.JobConf(DeprecatedInputFormatTest.class);
+ conf.setInputFormat(CombinedInputFormat.class);
+ conf.setNumReduceTasks(0);
+ conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
+ conf.setMapperClass(DeprecatedWriteMapper.class);
+ org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, new Path(inputDir.toURI()));
+ org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(conf, new Path(outputDir.toURI()));
+ conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, GroupReadSupport.class.getCanonicalName());
+ JobClient.runJob(conf);
+ File partFile = outputDir.listFiles(new PartFileFilter())[0];
+ BufferedReader br = new BufferedReader(new FileReader(partFile));
+ String line;
+ Set<String> s = new HashSet<String>();
+ while ((line = br.readLine()) != null) {
+ s.add(line.split("\t")[1]);
+ }
+ assertEquals(s.size(), 2);
+ assertTrue(s.contains("hello"));
+ assertTrue(s.contains("world"));
+ FileUtils.deleteDirectory(inputDir);
+ FileUtils.deleteDirectory(outputDir);
+ }
+
@Test
public void testReadWriteWithCountDeprecated() throws Exception {
runMapReduceJob(CompressionCodecName.GZIP);