You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/18 01:07:08 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1190] Fallback
to full schema if configured shuffle schema is not available
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 32f46b4 [GOBBLIN-1190] Fallback to full schema if configured shuffle schema is not available
32f46b4 is described below
commit 32f46b40e18e71f69ad979f684fb6bd4d055a290
Author: Lei Sun <au...@gmail.com>
AuthorDate: Wed Jun 17 18:06:52 2020 -0700
[GOBBLIN-1190] Fallback to full schema if configured shuffle schema is not available
Fallback to full schema if configured shuffle
schema is not available
Fix unit test
Address comments; Optimize the repeated usage of
eligibility-check
Always execute stopServers in elasticSearchWriter
Enforce serial execution in kafka-09 test
Closes #3038 from autumnust/ETL-10913
---
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 39 ++++++++++++++++++++--
.../compaction/mapreduce/orc/OrcValueMapper.java | 14 ++++++--
.../compaction/mapreduce/orc/OrcUtilsTest.java | 29 ++++++++++++++++
gobblin-modules/gobblin-kafka-09/build.gradle | 1 +
4 files changed, 78 insertions(+), 5 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 0af4751..ec9efc1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -29,8 +29,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
-import org.apache.gobblin.util.FileListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -59,12 +57,16 @@ import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcTimestamp;
import org.apache.orc.mapred.OrcUnion;
+import org.apache.parquet.format.TypeDefinedOrder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.util.FileListUtils;
+
@Slf4j
public class OrcUtils {
@@ -249,7 +251,7 @@ public class OrcUtils {
(WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
} else {
// Regardless whether type-widening is happening or not, this method copy the value of w into v.
- handlePrimitiveWritableComparable(w, v);
+ handlePrimitiveWritableComparable(w, v);
}
// If non-primitive or type-widening is required, v should already be populated by w's value recursively.
@@ -482,4 +484,35 @@ public class OrcUtils {
public static WritableComparable createValueRecursively(TypeDescription schema) {
return createValueRecursively(schema, 1);
}
+
+ /**
+ * Check recursively if owning schema is eligible to be up-converted to targetSchema if
+ * TargetSchema is a subset of originalSchema
+ */
+ public static boolean eligibleForUpConvertHelper(TypeDescription originalSchema, TypeDescription targetSchema) {
+ if (!targetSchema.getCategory().isPrimitive()) {
+ if (!originalSchema.getFieldNames().containsAll(targetSchema.getFieldNames())) {
+ return false;
+ }
+ boolean result = true;
+
+ for (int i = 0; i < targetSchema.getFieldNames().size(); i++) {
+ String subSchemaFieldName = targetSchema.getFieldNames().get(i);
+ result &= eligibleForUpConvertHelper(originalSchema.findSubtype(subSchemaFieldName),
+ targetSchema.getChildren().get(i));
+ }
+
+ return result;
+ } else {
+ // Check the unit type: Only for the category.
+ return originalSchema.getCategory().equals(targetSchema.getCategory());
+ }
+ }
+
+ // Eligibility for up-conversion: If targetSchema is a subset of originalSchema (Schema projection)
+ // and vice-versa (schema expansion).
+ public static boolean eligibleForUpConvert(TypeDescription originalSchema, TypeDescription targetSchema) {
+ return eligibleForUpConvertHelper(originalSchema, targetSchema) || eligibleForUpConvertHelper(targetSchema,
+ originalSchema);
+ }
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index b6bcdcd..31cbbdf 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
-import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -38,6 +37,10 @@ import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
+
+import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+
/**
* To keep consistent with {@link OrcMapreduceRecordReader}'s decision on implementing
@@ -52,6 +55,9 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
private OrcValue outValue;
private TypeDescription mrOutputSchema;
private TypeDescription shuffleKeySchema;
+ // Lazily initiated flag indicating if shuffleKeySchema is eligible to be used.
+ // Check org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert for details.
+ private Boolean isShuffleKeyEligible;
private JobConf jobConf;
// This is added mostly for debuggability.
@@ -119,7 +125,11 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
* Note: This method should have no side-effect on input record.
*/
private void fillDedupKey(OrcStruct originalRecord) {
- if (!originalRecord.getSchema().equals(this.shuffleKeySchema)) {
+ if (this.isShuffleKeyEligible == null) {
+ this.isShuffleKeyEligible = eligibleForUpConvert(originalRecord.getSchema(), this.shuffleKeySchema);
+ }
+
+ if (!originalRecord.getSchema().equals(this.shuffleKeySchema) && isShuffleKeyEligible) {
OrcUtils.upConvertOrcStruct(originalRecord, (OrcStruct) this.outKey.key, this.shuffleKeySchema);
} else {
this.outKey.key = originalRecord;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
index 54527f0..c1cfd14 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtilsTest.java
@@ -293,4 +293,33 @@ public class OrcUtilsTest {
OrcUtils.upConvertOrcStruct(originalStruct, projectColumnStruct, projectedSchema);
Assert.assertEquals(projectColumnStruct, projectedStructExpectedValue);
}
+
+ public void testSchemaContains() throws Exception {
+ // Simple case.
+ TypeDescription struct_0 = TypeDescription.fromString("struct<a:int,b:int>");
+ TypeDescription struct_1 = TypeDescription.fromString("struct<a:int>");
+ Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_0, struct_1));
+
+ // Nested schema case.
+ TypeDescription struct_2 = TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
+ TypeDescription struct_3 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<c:int>,c:int>");
+ Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_2, struct_3));
+
+ // Negative case.
+ TypeDescription struct_4 = TypeDescription.fromString("struct<a:struct<a:int,b:int>,b:struct<c:int,d:int>,c:int>");
+ TypeDescription struct_5 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<c:int>,d:int>");
+ Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_4, struct_5));
+ TypeDescription struct_6 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int>,c:int>");
+ Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_4, struct_6));
+
+ // Cases when target schema contains more
+ TypeDescription struct_7 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int,f:int>,c:int>");
+ Assert.assertTrue(OrcUtils.eligibleForUpConvert(struct_6, struct_7));
+
+ // Negative case when target schema contains more but not all of the owning schema are there in the target schema.
+ // Note that struct_8 has a field "a.x".
+ TypeDescription struct_8 = TypeDescription.fromString("struct<a:struct<x:int>,b:struct<e:int>,c:int>");
+ TypeDescription struct_9 = TypeDescription.fromString("struct<a:struct<a:int>,b:struct<e:int,f:int>,c:int>");
+ Assert.assertFalse(OrcUtils.eligibleForUpConvert(struct_8, struct_9));
+ }
}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle b/gobblin-modules/gobblin-kafka-09/build.gradle
index 44a8bd2..f0ac579 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -87,6 +87,7 @@ artifacts {
}
test {
+ forkEvery = 1
workingDir rootProject.rootDir
systemProperty "live.newtopic", System.getProperty("live.newtopic")
systemProperty "live.newtopic.replicationCount", System.getProperty("live.newtopic.replicationCount")