You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2020/03/27 19:53:25 UTC

[orc] branch master updated: ORC-613: Fix OrcMapredRecordReader when dealing with union of multiple structs with different schema

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c79e04  ORC-613: Fix OrcMapredRecordReader when dealing with union of multiple structs with different schema
6c79e04 is described below

commit 6c79e04a9018baf6c68092912da8f2774f35050e
Author: Lei Sun <le...@apache.org>
AuthorDate: Tue Mar 17 16:20:39 2020 -0700

    ORC-613: Fix OrcMapredRecordReader when dealing with union of multiple
    structs with different schema
    
    Fixes #498
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 .../apache/orc/mapred/OrcMapredRecordReader.java   | 17 ++++-
 .../test/org/apache/orc/mapred/TestOrcStruct.java  | 89 +++++++++++++++++++++-
 2 files changed, 102 insertions(+), 4 deletions(-)

diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
index ea49788..0a58774 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -394,10 +394,10 @@ public class OrcMapredRecordReader<V extends WritableComparable>
       OrcStruct result;
       List<TypeDescription> childrenTypes = schema.getChildren();
       int numChildren = childrenTypes.size();
-      if (previous == null || previous.getClass() != OrcStruct.class) {
-        result = new OrcStruct(schema);
-      } else {
+      if (isReusable(previous, schema)) {
         result = (OrcStruct) previous;
+      } else {
+        result = new OrcStruct(schema);
       }
       StructColumnVector struct = (StructColumnVector) vector;
       for(int f=0; f < numChildren; ++f) {
@@ -410,6 +410,17 @@ public class OrcMapredRecordReader<V extends WritableComparable>
     }
   }
 
+  /**
+   * Determine if a OrcStruct object is reusable.
+   */
+  private static boolean isReusable(Object previous, TypeDescription schema) {
+    if (previous == null || previous.getClass() != OrcStruct.class) {
+      return false;
+    }
+
+    return ((OrcStruct) previous).getSchema().equals(schema);
+  }
+
   static OrcUnion nextUnion(ColumnVector vector,
                             int row,
                             TypeDescription schema,
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
index 82699ed..b579d9a 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
@@ -18,20 +18,39 @@
 
 package org.apache.orc.mapred;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetadata;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
+import com.google.common.io.Files;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+
 public class TestOrcStruct {
 
   @Rule
@@ -67,6 +86,74 @@ public class TestOrcStruct {
   }
 
   @Test
+  public void testMapredRead() throws Exception {
+    TypeDescription internalStruct_0 = TypeDescription.createStruct()
+        .addField("field0", TypeDescription.createString())
+        .addField("field1", TypeDescription.createBoolean());
+    TypeDescription internalStruct_1 = TypeDescription.createStruct();
+    TypeDescription internalStruct_2 = TypeDescription.createStruct().addField("f0", TypeDescription.createInt());
+
+    TypeDescription unionWithMultipleStruct = TypeDescription.createUnion()
+        .addUnionChild(internalStruct_0)
+        .addUnionChild(internalStruct_1)
+        .addUnionChild(internalStruct_2);
+
+    OrcStruct o1 = new OrcStruct(internalStruct_0);
+    o1.setFieldValue("field0", new Text("key"));
+    o1.setFieldValue("field1", new BooleanWritable(true));
+
+    OrcStruct o2 = new OrcStruct(internalStruct_0);
+    o2.setFieldValue("field0", new Text("key_1"));
+    o2.setFieldValue("field1", new BooleanWritable(false));
+
+    OrcStruct o3 = new OrcStruct(TypeDescription.createStruct());
+
+    OrcStruct o4 = new OrcStruct(internalStruct_2);
+    o4.setFieldValue("f0", new IntWritable(1));
+
+    OrcUnion u1 = new OrcUnion(unionWithMultipleStruct);
+    u1.set(0, o1);
+    OrcUnion u2 = new OrcUnion(unionWithMultipleStruct);
+    u2.set(0, o2);
+    OrcUnion u3 = new OrcUnion(unionWithMultipleStruct);
+    u3.set(1, o3);
+    OrcUnion u4 = new OrcUnion(unionWithMultipleStruct);
+    u4.set(2, o4);
+
+    File testFolder = Files.createTempDir();
+    testFolder.deleteOnExit();
+    Path testFilePath = new Path(testFolder.getAbsolutePath(), "testFile");
+    Configuration conf = new Configuration();
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).setSchema(unionWithMultipleStruct)
+            .stripeSize(100000).bufferSize(10000)
+            .version(OrcFile.Version.CURRENT));
+
+    OrcMapredRecordWriter<OrcUnion> recordWriter =
+        new OrcMapredRecordWriter<>(writer);
+    recordWriter.write(NullWritable.get(), u1);
+    recordWriter.write(NullWritable.get(), u2);
+    recordWriter.write(NullWritable.get(), u3);
+    recordWriter.write(NullWritable.get(), u4);
+    recordWriter.close(null);
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(FileSystem.getLocal(conf)));
+    Reader.Options options = reader.options().schema(unionWithMultipleStruct);
+
+    OrcMapredRecordReader<OrcUnion> recordReader = new OrcMapredRecordReader<>(reader,options);
+    OrcUnion result = recordReader.createValue();
+    recordReader.next(recordReader.createKey(), result);
+    Assert.assertEquals(result, u1);
+    recordReader.next(recordReader.createKey(), result);
+    Assert.assertEquals(result, u2);
+    recordReader.next(recordReader.createKey(), result);
+    Assert.assertEquals(result, u3);
+    recordReader.next(recordReader.createKey(), result);
+    Assert.assertEquals(result, u4);
+  }
+
+  @Test
   public void testFieldAccess() {
     OrcStruct struct = new OrcStruct(TypeDescription.fromString
         ("struct<i:int,j:double,k:string>"));