You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/04/12 16:50:28 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1114]
OrcValueMapper schema evolution up-conversion recursive
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f12dc6e [GOBBLIN-1114] OrcValueMapper schema evolution up-conversion recursive
f12dc6e is described below
commit f12dc6eb5d84b74aeb9ed237266807a978ae51a4
Author: Lei Sun <le...@linkedin.com>
AuthorDate: Sun Apr 12 09:50:11 2020 -0700
[GOBBLIN-1114] OrcValueMapper schema evolution up-conversion recursive
OrcValueMapper schema evolution up-conversion
recursive
Fix findBugsMain
Address comments
Address comments
Closes #2954 from autumnust/master
---
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 89 +++++++++++-
.../compaction/mapreduce/orc/OrcValueMapper.java | 151 +++++++++------------
.../mapreduce/orc/OrcValueMapperTest.java | 64 ++++++++-
3 files changed, 212 insertions(+), 92 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 7f82690..6d053d2 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -19,8 +19,12 @@ package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
import org.apache.gobblin.util.FileListUtils;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +36,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ConvertTreeReaderFactory;
+import org.apache.orc.impl.SchemaEvolution;
public class OrcUtils {
@@ -50,6 +56,9 @@ public class OrcUtils {
public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
Path[] sourceDirs = FileInputFormat.getInputPaths(job);
+ if (sourceDirs.length == 0) {
+ throw new IllegalStateException("There should be at least one directory specified for the MR job");
+ }
List<FileStatus> files = new ArrayList<FileStatus>();
@@ -67,6 +76,84 @@ public class OrcUtils {
}
throw new IllegalStateException(
- String.format("There's no file carrying orc file schema in %s list", sourceDirs));
+ String.format("There's no file carrying orc file schema in the list of directories: %s", Arrays.toString(sourceDirs)));
+ }
+
+ /**
+ * Determine if two types are following valid evolution.
+ * Implementation stolen and manipulated from {@link SchemaEvolution} as that was package-private.
+ */
+ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription readerType) {
+ boolean isOk = true;
+ if (fileType.getCategory() == readerType.getCategory()) {
+ switch (readerType.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ case TIMESTAMP:
+ case BINARY:
+ case DATE:
+ // these are always a match
+ break;
+ case CHAR:
+ case VARCHAR:
+ break;
+ case DECIMAL:
+ break;
+ case UNION:
+ case MAP:
+ case LIST: {
+ // these must be an exact match
+ List<TypeDescription> fileChildren = fileType.getChildren();
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ if (fileChildren.size() == readerChildren.size()) {
+ for (int i = 0; i < fileChildren.size(); ++i) {
+ isOk &= isEvolutionValid(fileChildren.get(i), readerChildren.get(i));
+ }
+ return isOk;
+ } else {
+ return false;
+ }
+ }
+ case STRUCT: {
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ List<TypeDescription> fileChildren = fileType.getChildren();
+
+ List<String> readerFieldNames = readerType.getFieldNames();
+ List<String> fileFieldNames = fileType.getFieldNames();
+
+ final Map<String, TypeDescription> fileTypesIdx = new HashMap();
+ for (int i = 0; i < fileFieldNames.size(); i++) {
+ final String fileFieldName = fileFieldNames.get(i);
+ fileTypesIdx.put(fileFieldName, fileChildren.get(i));
+ }
+
+ for (int i = 0; i < readerFieldNames.size(); i++) {
+ final String readerFieldName = readerFieldNames.get(i);
+ TypeDescription readerField = readerChildren.get(i);
+ TypeDescription fileField = fileTypesIdx.get(readerFieldName);
+ if (fileField == null) {
+ continue;
+ }
+
+ isOk &= isEvolutionValid(fileField, readerField);
+ }
+ return isOk;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + readerType);
+ }
+ return isOk;
+ } else {
+ /*
+ * Check for the few cases where will not convert....
+ */
+ return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+ }
}
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index c23ed9a..ab9b065 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -18,64 +18,83 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ConvertTreeReaderFactory;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
import org.apache.orc.mapred.OrcValue;
import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
/**
* To keep consistent with {@link OrcMapreduceRecordReader}'s decision on implementing
* {@link RecordReader} with {@link NullWritable} as the key and generic type of value, the ORC Mapper will
* read in the record as the input value.
*/
+@Slf4j
public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct, Object, OrcValue> {
private OrcValue outValue;
private TypeDescription mapperSchema;
+ // This is added mostly for debuggability.
+ private static int writeCount = 0;
+
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
this.outValue = new OrcValue();
- this.mapperSchema = TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
+ this.mapperSchema =
+ TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
}
@Override
protected void map(NullWritable key, OrcStruct orcStruct, Context context)
throws IOException, InterruptedException {
- OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
- if (context.getNumReduceTasks() == 0) {
- this.outValue.value = upConvertedStruct;
- context.write(NullWritable.get(), this.outValue);
- } else {
- this.outValue.value = upConvertedStruct;
- context.write(getDedupKey(upConvertedStruct), this.outValue);
+ OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, mapperSchema);
+ this.outValue.value = upConvertedStruct;
+ try {
+ if (context.getNumReduceTasks() == 0) {
+ context.write(NullWritable.get(), this.outValue);
+ } else {
+ context.write(getDedupKey(upConvertedStruct), this.outValue);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failure in write record no." + writeCount, e);
}
+ writeCount += 1;
context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
}
/**
- * If a {@link OrcStruct}'s schema differs from newest schema obtained when creating MR jobs (which is the
- * newest schema seen by the MR job), all the other ORC object will need to be up-converted.
+ * Recursively up-convert the {@link OrcStruct} into {@link #mapperSchema}
+ * Limitation:
+ * 1. Does not support up-conversion of key types in Maps
+ * 2. Conversion only happens if org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid return true.
*/
- OrcStruct upConvertOrcStruct(OrcStruct orcStruct, Context context) {
+ @VisibleForTesting
+ OrcStruct upConvertOrcStruct(OrcStruct orcStruct, TypeDescription mapperSchema) {
// For ORC schema, if schema object differs that means schema itself is different while for Avro,
// there are chances that documentation or attributes' difference lead to the schema object difference.
if (!orcStruct.getSchema().equals(mapperSchema)) {
+ log.info("There's schema mismatch identified from reader's schema and writer's schema");
OrcStruct newStruct = new OrcStruct(mapperSchema);
int indexInNewSchema = 0;
@@ -90,8 +109,10 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
TypeDescription fileType = oldSchemaTypes.get(fieldIndex);
TypeDescription readerType = newSchemaTypes.get(indexInNewSchema);
- if (isEvolutionValid(fileType, readerType)) {
- newStruct.setFieldValue(field, orcStruct.getFieldValue(field));
+ if (OrcUtils.isEvolutionValid(fileType, readerType)) {
+ WritableComparable oldField = orcStruct.getFieldValue(field);
+ oldField = structConversionHelper(oldField, mapperSchema.getChildren().get(fieldIndex));
+ newStruct.setFieldValue(field, oldField);
} else {
throw new SchemaEvolution.IllegalEvolutionException(String
.format("ORC does not support type conversion from file" + " type %s to reader type %s ",
@@ -111,81 +132,37 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
}
/**
- * Determine if two types are following valid evolution.
- * Implementation stolen and manipulated from {@link SchemaEvolution} as that was package-private.
+ * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+ * Check failure will trigger Cast exception and blow up the process.
*/
- static boolean isEvolutionValid(TypeDescription fileType, TypeDescription readerType) {
- boolean isOk = true;
- if (fileType.getCategory() == readerType.getCategory()) {
- switch (readerType.getCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case DOUBLE:
- case FLOAT:
- case STRING:
- case TIMESTAMP:
- case BINARY:
- case DATE:
- // these are always a match
- break;
- case CHAR:
- case VARCHAR:
- break;
- case DECIMAL:
- break;
- case UNION:
- case MAP:
- case LIST: {
- // these must be an exact match
- List<TypeDescription> fileChildren = fileType.getChildren();
- List<TypeDescription> readerChildren = readerType.getChildren();
- if (fileChildren.size() == readerChildren.size()) {
- for (int i = 0; i < fileChildren.size(); ++i) {
- isOk &= isEvolutionValid(fileChildren.get(i), readerChildren.get(i));
- }
- return isOk;
- } else {
- return false;
- }
- }
- case STRUCT: {
- List<TypeDescription> readerChildren = readerType.getChildren();
- List<TypeDescription> fileChildren = fileType.getChildren();
-
- List<String> readerFieldNames = readerType.getFieldNames();
- List<String> fileFieldNames = fileType.getFieldNames();
-
- final Map<String, TypeDescription> fileTypesIdx = new HashMap();
- for (int i = 0; i < fileFieldNames.size(); i++) {
- final String fileFieldName = fileFieldNames.get(i);
- fileTypesIdx.put(fileFieldName, fileChildren.get(i));
- }
-
- for (int i = 0; i < readerFieldNames.size(); i++) {
- final String readerFieldName = readerFieldNames.get(i);
- TypeDescription readerField = readerChildren.get(i);
- TypeDescription fileField = fileTypesIdx.get(readerFieldName);
- if (fileField == null) {
- continue;
- }
-
- isOk &= isEvolutionValid(fileField, readerField);
- }
- return isOk;
- }
- default:
- throw new IllegalArgumentException("Unknown type " + readerType);
+ @SuppressWarnings("unchecked")
+ private WritableComparable structConversionHelper(WritableComparable w, TypeDescription mapperSchema) {
+ if (w instanceof OrcStruct) {
+ return upConvertOrcStruct((OrcStruct) w, mapperSchema);
+ } else if (w instanceof OrcList) {
+ OrcList castedList = (OrcList) w;
+ TypeDescription elementType = mapperSchema.getChildren().get(0);
+ for (int i = 0; i < castedList.size(); i++) {
+ castedList.set(i, structConversionHelper((WritableComparable) castedList.get(i), elementType));
}
- return isOk;
- } else {
- /*
- * Check for the few cases where will not convert....
- */
- return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+ } else if (w instanceof OrcMap) {
+ OrcMap castedMap = (OrcMap) w;
+ for (Object entry : castedMap.entrySet()) {
+ Map.Entry<WritableComparable, WritableComparable> castedEntry =
+ (Map.Entry<WritableComparable, WritableComparable>) entry;
+ castedMap.put(castedEntry.getKey(),
+ structConversionHelper(castedEntry.getValue(), mapperSchema.getChildren().get(1)));
+ }
+ return castedMap;
+ } else if (w instanceof OrcUnion) {
+ OrcUnion castedUnion = (OrcUnion) w;
+ byte tag = castedUnion.getTag();
+ castedUnion.set(tag,
+ structConversionHelper((WritableComparable) castedUnion.getObject(), mapperSchema.getChildren().get(tag)));
}
+
+ // Directly return if primitive object.
+ return w;
}
/**
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
index 2304bdc..651c512 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
+import org.apache.orc.mapred.OrcStruct;
import org.junit.Assert;
import org.testng.annotations.Test;
@@ -29,9 +31,63 @@ public class OrcValueMapperTest {
TypeDescription schema_2 = TypeDescription.fromString("struct<i:int,j:int,k:bigint>");
TypeDescription schema_3 = TypeDescription.fromString("struct<i:int,j:int,k:tinyint>");
TypeDescription schema_4 = TypeDescription.fromString("struct<i:int,j:int>");
- Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_2));
- Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_3));
- Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_4));
- Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_4, schema_1));
+ Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_2));
+ Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_3));
+ Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_4));
+ Assert.assertTrue(OrcUtils.isEvolutionValid(schema_4, schema_1));
+ }
+
+ @Test
+ public void testUpConvertOrcStruct(){
+ OrcValueMapper mapper = new OrcValueMapper();
+
+ // Basic case.
+ TypeDescription baseStructSchema = TypeDescription.fromString("struct<a:int,b:string>");
+ OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
+ TypeDescription evolved_baseStructSchema = TypeDescription.fromString("struct<a:int,b:string,c:int>");
+ OrcStruct evolvedStruct = (OrcStruct) OrcStruct.createValue(evolved_baseStructSchema);
+ OrcStruct resultStruct = mapper.upConvertOrcStruct(baseStruct, evolved_baseStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), evolved_baseStructSchema);
+
+ // Base case: Reverse direction.
+ resultStruct = mapper.upConvertOrcStruct(evolvedStruct, baseStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), baseStructSchema);
+
+ // Simple Nested: List/Map/Union/Struct within Struct.
+ TypeDescription listInStructSchema = TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
+ OrcStruct listInStruct = (OrcStruct) OrcStruct.createValue(listInStructSchema);
+ TypeDescription evolved_listInStructSchema = TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:string>>>");
+ OrcStruct evolved_listInStruct = (OrcStruct) OrcStruct.createValue(evolved_listInStructSchema);
+ resultStruct = mapper.upConvertOrcStruct(listInStruct, evolved_listInStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), evolved_listInStructSchema);
+ resultStruct = mapper.upConvertOrcStruct(evolved_listInStruct, listInStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), listInStructSchema);
+
+ TypeDescription mapInStructSchema = TypeDescription.fromString("struct<a:map<string,int>>");
+ OrcStruct mapInStruct = (OrcStruct) OrcStruct.createValue(mapInStructSchema);
+ TypeDescription evolved_mapInStructSchema = TypeDescription.fromString("struct<a:map<string,bigint>>");
+ OrcStruct evolved_mapInStruct = (OrcStruct) OrcStruct.createValue(evolved_mapInStructSchema);
+ resultStruct = mapper.upConvertOrcStruct(mapInStruct, evolved_mapInStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), evolved_mapInStructSchema);
+ resultStruct = mapper.upConvertOrcStruct(evolved_mapInStruct, mapInStructSchema);
+ // Evolution not valid, no up-conversion happened.
+ try {
+ resultStruct.getSchema().equals(evolved_mapInStructSchema);
+ } catch (SchemaEvolution.IllegalEvolutionException ie) {
+ Assert.assertTrue(true);
+ }
+
+ TypeDescription unionInStructSchema = TypeDescription.fromString("struct<a:uniontype<int,string>>");
+ OrcStruct unionInStruct = (OrcStruct) OrcStruct.createValue(unionInStructSchema);
+ TypeDescription evolved_unionInStructSchema = TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
+ resultStruct = mapper.upConvertOrcStruct(unionInStruct, evolved_unionInStructSchema);
+ Assert.assertEquals(resultStruct.getSchema(), evolved_unionInStructSchema);
+
+ // Complex: List<Struct> within struct among others and evolution happens on multiple places.
+ TypeDescription complex_1 = TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
+ OrcStruct complex_struct = (OrcStruct) OrcStruct.createValue(complex_1);
+ TypeDescription evolved_complex_1 = TypeDescription.fromString("struct<a:array<struct<a:string,b:int,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
+ resultStruct = mapper.upConvertOrcStruct(complex_struct, evolved_complex_1);
+ Assert.assertEquals(resultStruct.getSchema(), evolved_complex_1);
}
}
\ No newline at end of file