You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/18 01:07:08 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1190] Fallback to full schema if configured shuffle schema is not available

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f46b4  [GOBBLIN-1190] Fallback to full schema if configured shuffle schema is not available
32f46b4 is described below

commit 32f46b40e18e71f69ad979f684fb6bd4d055a290
Author: Lei Sun <au...@gmail.com>
AuthorDate: Wed Jun 17 18:06:52 2020 -0700

    [GOBBLIN-1190] Fallback to full schema if configured shuffle schema is not available
    
    Fallback to full schema if configured shuffle
    schema is not available
    
    Fix unit test
    
    Address comments; Optimize the repeated usage of
    eligibility-check
    
    Always execute stopServers in elasticSearchWriter
    
    Enforce serial execution in kafka-09 test
    
    Closes #3038 from autumnust/ETL-10913
---
 .../gobblin/compaction/mapreduce/orc/OrcUtils.java | 39 ++++++++++++++++++++--
 .../compaction/mapreduce/orc/OrcValueMapper.java   | 14 ++++++--
 .../compaction/mapreduce/orc/OrcUtilsTest.java     | 29 ++++++++++++++++
 gobblin-modules/gobblin-kafka-09/build.gradle      |  1 +
 4 files changed, 78 insertions(+), 5 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 0af4751..ec9efc1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -29,8 +29,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.util.FileListUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,12 +57,16 @@ import org.apache.orc.mapred.OrcMap;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcTimestamp;
 import org.apache.orc.mapred.OrcUnion;
+import org.apache.parquet.format.TypeDefinedOrder;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.FileListUtils;
+
 
 @Slf4j
 public class OrcUtils {
@@ -249,7 +251,7 @@ public class OrcUtils {
           (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
     } else {
       // Regardless whether type-widening is happening or not, this method copy the value of w into v.
-        handlePrimitiveWritableComparable(w, v);
+      handlePrimitiveWritableComparable(w, v);
     }
 
     // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
@@ -482,4 +484,35 @@ public class OrcUtils {
   public static WritableComparable createValueRecursively(TypeDescription schema) {
     return createValueRecursively(schema, 1);
   }
+
+  /**
+   * Check recursively if owning schema is eligible to be up-converted to targetSchema if
+   * TargetSchema is a subset of originalSchema
+   */
+  public static boolean eligibleForUpConvertHelper(TypeDescription originalSchema, TypeDescription targetSchema) {
+    if (!targetSchema.getCategory().isPrimitive()) {
+      if (!originalSchema.getFieldNames().containsAll(targetSchema.getFieldNames())) {
+        return false;
+      }
+      boolean result = true;
+
+      for (int i = 0; i < targetSchema.getFieldNames().size(); i++) {
+        String subSchemaFieldName = targetSchema.getFieldNames().get(i);
+        result &= eligibleForUpConvertHelper(originalSchema.findSubtype(subSchemaFieldName),
+            targetSchema.getChildren().get(i));
+      }
+
+      return result;
+    } else {
+      // Check the unit type: Only for the category.
+      return originalSchema.getCategory().equals(targetSchema.getCategory());
+    }
+  }
+
+  // Eligibility for up-conversion: If targetSchema is a subset of originalSchema (Schema projection)
+  // and vice-versa (schema expansion).
+  public static boolean eligibleForUpConvert(TypeDescription originalSchema, TypeDescription targetSchema) {
+    return eligibleForUpConvertHelper(originalSchema, targetSchema) || eligibleForUpConvertHelper(targetSchema,
+        originalSchema);
+  }
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index b6bcdcd..31cbbdf 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 
-import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +37,10 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+
+import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+
 
 /**
  * To keep consistent with {@link OrcMapreduceRecordReader}'s decision on implementing
@@ -52,6 +55,9 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
   private OrcValue outValue;
   private TypeDescription mrOutputSchema;
   private TypeDescription shuffleKeySchema;
+  // Lazily initiated flag indicating if shuffleKeySchema is eligible to be used.
+  // Check org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert for details.
+  private Boolean isShuffleKeyEligible;
   private JobConf jobConf;
 
   // This is added mostly for debuggability.
@@ -119,7 +125,11 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
    * Note: This method should have no side-effect on input record.
    */
   private void fillDedupKey(OrcStruct originalRecord) {
-    if (!originalRecord.getSchema().equals(this.shuffleKeySchema)) {
+    if (this.isShuffleKeyEligible == null) {
+      this.isShuffleKeyEligible = eligibleForUpConvert(originalRecord.getSchema(), this.shuffleKeySchema);
+    }
+
+    if (!originalRecord.getSchema().equals(this.shuffleKeySchema) && isShuffleKeyEligible) {
       OrcUtils.upConvertOrcStruct(originalRecord, (OrcStruct) this.outKey.key, this.shuffleKeySchema);
     } else {
       this.outKey.key = originalRecord;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
index 54527f0..c1cfd14 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
@@ -293,4 +293,33 @@ public class OrcUtilsTest {
     OrcUtils.upConvertOrcStruct(originalStruct, projectColumnStruct, projectedSchema);
     Assert.assertEquals(projectColumnStruct, projectedStructExpectedValue);
   }
+
+  public void testSchemaContains() throws Exception {
+    // Simple case.
+    TypeDescription struct_0 = TypeDescription.fromString("struct<a:int,b:int>");
+    TypeDescription struct_1 = TypeDescription.fromString("struct<a:int>");
+    Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_0, struct_1));
+
+    // Nested schema case.
+    TypeDescription struct_2 = TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
+    TypeDescription struct_3 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<c:int>,c:int>");
+    Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_2, struct_3));
+
+    // Negative case.
+    TypeDescription struct_4 = TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
+    TypeDescription struct_5 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<c:int>,d:int>");
+    Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_4, struct_5));
+    TypeDescription struct_6 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int>,c:int>");
+    Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_4, struct_6));
+
+    // Cases when target schema contains more
+    TypeDescription struct_7 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int,f:int>,c:int>");
+    Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_6, struct_7));
+
+    // Negative case when target schema contains more but not all of the owning schema are there in the target schema.
+    // Note that struct_8 has a field "a.x".
+    TypeDescription struct_8 = TypeDescription.fromString("struct<a:struct<x:int>,b:struct<e:int>,c:int>");
+    TypeDescription struct_9 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int,f:int>,c:int>");
+    Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_8, struct_9));
+  }
 }
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 44a8bd2..f0ac579 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -87,6 +87,7 @@ artifacts {
 }
 
 test {
+  forkEvery = 1
   workingDir rootProject.rootDir
   systemProperty "live.newtopic", System.getProperty("live.newtopic")
   systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")