You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2021/09/06 21:41:18 UTC

[orc] branch main updated: ORC-715: Add MapReduce test case (#897)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new a255164  ORC-715: Add MapReduce test case (#897)
a255164 is described below

commit a255164e8d90f90bc6382efad06f8b93f28ca391
Author: Guiyanakaung <gu...@gmail.com>
AuthorDate: Tue Sep 7 05:41:14 2021 +0800

    ORC-715: Add MapReduce test case (#897)
    
    ### What changes were proposed in this pull request?
    
    This PR aims to recover a test coverage which is removed at
    https://github.com/apache/orc/commit/db62ea5730797b046a3b7d5d1a3b026b08921209
    
    Refer Hadoop example and previous mapreduce tests.
    https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMapReduce.java
    https://github.com/apache/orc/pull/612/files#diff-15999a33f815f28f3251acabae2a16dd2f1114d8c73e6fab26c207c08884ce68
    
    BTW This pr bypasses the bug in older versions of hadoop-common by configuring `mapreduce.job.output.key.comparator.class`.  Fix [ORC-964 ](https://issues.apache.org/jira/projects/ORC/issues/ORC-964?filter=allopenissues) issue.
    
    ### Why are the changes needed?
    
    Recover a test coverage
    
    ### How was this patch tested?
    
    Pass the CIs.
---
 java/mapreduce/pom.xml                             |  17 ++
 .../test/org/apache/orc/mapreduce/TestMrUnit.java  | 245 +++++++++++++++++++++
 2 files changed, 262 insertions(+)

diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml
index 0cd56ef..17db8d3 100644
--- a/java/mapreduce/pom.xml
+++ b/java/mapreduce/pom.xml
@@ -74,6 +74,12 @@
 
     <!-- test inter-project -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <version>${min.hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
       <scope>test</scope>
@@ -114,6 +120,17 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <ignoredUnusedDeclaredDependencies>
+            <ignoredUnusedDeclaredDependency>
+              org.apache.hadoop:hadoop-mapreduce-client-jobclient
+            </ignoredUnusedDeclaredDependency>
+          </ignoredUnusedDeclaredDependencies>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
new file mode 100644
index 0000000..e47b22d
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestMrUnit {
+
+  private static final File TEST_DIR = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")), "TestMapReduce-mapreduce");
+  private static FileSystem FS;
+
+  private static final JobConf CONF = new JobConf();
+
+  private static final TypeDescription INPUT_SCHEMA = TypeDescription
+      .fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>");
+
+  private static final TypeDescription OUT_SCHEMA = TypeDescription
+      .fromString("struct<first:struct<x:int,y:int>,second:struct<z:string>>");
+
+  static {
+    OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(CONF, "struct<x:int,y:int>");
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(CONF, "struct<z:string>");
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(CONF, OUT_SCHEMA.toString());
+    // This is required due to ORC-964
+    CONF.set("mapreduce.job.output.key.comparator.class", OrcKeyComparator.class.getName());
+    try {
+      FS = FileSystem.getLocal(CONF);
+    } catch (IOException ioe) {
+      FS = null;
+    }
+  }
+
+  public static class OrcKeyComparator implements RawComparator<OrcKey>, JobConfigurable {
+
+    private JobConf jobConf;
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      DataInputBuffer buffer1 = new DataInputBuffer();
+      DataInputBuffer buffer2 = new DataInputBuffer();
+
+      try {
+        buffer1.reset(b1, s1, l1);
+        buffer2.reset(b2, s2, l2);
+        OrcKey orcKey1 = new OrcKey();
+        orcKey1.configure(this.jobConf);
+        orcKey1.readFields(buffer1);
+        OrcKey orcKey2 = new OrcKey();
+        orcKey2.configure(this.jobConf);
+        orcKey2.readFields(buffer2);
+        return orcKey1.compareTo(orcKey2);
+      } catch (IOException e) {
+        throw new RuntimeException("compare orcKey fail", e);
+      }
+    }
+
+    @Override
+    public int compare(OrcKey o1, OrcKey o2) {
+      return o1.compareTo(o2);
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      this.jobConf = jobConf;
+    }
+  }
+
+
+  /**
+   * Split the input struct into its two parts.
+   */
+  public static class MyMapper
+      extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+    private final OrcKey keyWrapper = new OrcKey();
+    private final OrcValue valueWrapper = new OrcValue();
+
+    @Override
+    protected void map(NullWritable key,
+                       OrcStruct value,
+                       Context context
+    ) throws IOException, InterruptedException {
+      keyWrapper.key = value.getFieldValue(0);
+      valueWrapper.value = value.getFieldValue(1);
+      context.write(keyWrapper, valueWrapper);
+    }
+  }
+
+  /**
+   * Glue the key and values back together.
+   */
+  public static class MyReducer
+      extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+    private final OrcStruct output = new OrcStruct(OUT_SCHEMA);
+    private final NullWritable nada = NullWritable.get();
+
+    @Override
+    protected void reduce(OrcKey key,
+                          Iterable<OrcValue> values,
+                          Context context
+    ) throws IOException, InterruptedException {
+      output.setFieldValue(0, key.key);
+      for(OrcValue value: values) {
+        output.setFieldValue(1, value.value);
+        context.write(nada, output);
+      }
+    }
+  }
+
+  public void writeInputFile(Path inputPath) throws IOException {
+    Writer writer = OrcFile.createWriter(inputPath,
+        OrcFile.writerOptions(CONF).setSchema(INPUT_SCHEMA).overwrite(true));
+    OrcMapreduceRecordWriter<OrcStruct> recordWriter = new OrcMapreduceRecordWriter<>(writer);
+    NullWritable nada = NullWritable.get();
+
+    OrcStruct input = (OrcStruct) OrcStruct.createValue(INPUT_SCHEMA);
+    IntWritable x =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+    IntWritable y =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+    Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+    for(int r = 0; r < 20; ++r) {
+      x.set(100 -  (r / 4));
+      y.set(r * 2);
+      z.set(Integer.toHexString(r));
+      recordWriter.write(nada, input);
+    }
+    recordWriter.close(null);
+  }
+
+  private void readOutputFile(Path output) throws IOException, InterruptedException {
+    Reader reader = OrcFile.createReader(output, OrcFile.readerOptions(CONF));
+    OrcMapreduceRecordReader<OrcStruct> recordReader = new OrcMapreduceRecordReader<>(reader,
+            org.apache.orc.mapred.OrcInputFormat.buildOptions(CONF, reader, 0, 20));
+
+    int[] expectedX = new int[20];
+    int[] expectedY = new int[20];
+    String[] expectedZ = new String[20];
+    int count = 0;
+    for(int g = 4; g >= 0; --g) {
+      for(int i = 0; i < 4; ++i) {
+        expectedX[count] = 100 - g;
+        int r = g * 4 + i;
+        expectedY[count] = r * 2;
+        expectedZ[count ++] = Integer.toHexString(r);
+      }
+    }
+
+    int row = 0;
+    while (recordReader.nextKeyValue()) {
+      OrcStruct value = recordReader.getCurrentValue();
+      IntWritable x =
+          (IntWritable) ((OrcStruct) value.getFieldValue(0)).getFieldValue(0);
+      IntWritable y =
+          (IntWritable) ((OrcStruct) value.getFieldValue(0)).getFieldValue(1);
+      Text z = (Text) ((OrcStruct) value.getFieldValue(1)).getFieldValue(0);
+      assertEquals(expectedX[row], x.get());
+      assertEquals(expectedY[row], y.get());
+      assertEquals(expectedZ[row], z.toString());
+      row ++;
+    }
+    recordReader.close();
+  }
+
+  @Test
+  public void testMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+    Path testDir = new Path(TEST_DIR.getAbsolutePath());
+    Path input = new Path(testDir, "input");
+    Path output = new Path(testDir, "output");
+    FS.delete(input, true);
+    FS.delete(output, true);
+
+    writeInputFile(new Path(input, "input.orc"));
+
+    Job job = Job.getInstance(CONF);
+
+    job.setMapperClass(MyMapper.class);
+    job.setInputFormatClass(OrcInputFormat.class);
+    FileInputFormat.setInputPaths(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+    job.setOutputKeyClass(OrcKey.class);
+    job.setOutputValueClass(OrcValue.class);
+    job.setOutputFormatClass(OrcOutputFormat.class);
+    job.setReducerClass(MyReducer.class);
+    job.setNumReduceTasks(1);
+
+    job.waitForCompletion(true);
+
+    FileStatus[] fileStatuses = output.getFileSystem(CONF)
+        .listStatus(output, path -> path.getName().endsWith(".orc"));
+
+    assertEquals(fileStatuses.length, 1);
+
+    Path path = fileStatuses[0].getPath();
+    readOutputFile(path);
+  }
+
+}