You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/01/19 00:41:04 UTC
[incubator-hudi] branch master updated: [HUDI-552] Fix the schema
mismatch in Row-to-Avro conversion (#1246)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d0ee95e [HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246)
d0ee95e is described below
commit d0ee95ed16de6c3568f575169cb993b9c10ced3d
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Sat Jan 18 16:40:56 2020 -0800
[HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246)
---
.../org/apache/hudi/AvroConversionUtils.scala | 8 ++-
.../hudi/utilities/deltastreamer/DeltaSync.java | 15 ++++-
.../deltastreamer/SourceFormatAdapter.java | 13 +++-
.../hudi/utilities/TestHoodieDeltaStreamer.java | 76 +++++++++++++++++++++-
4 files changed, 104 insertions(+), 8 deletions(-)
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index a27d0ee..16c2d75 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -31,12 +31,14 @@ object AvroConversionUtils {
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
- createRdd(df, avroSchema.toString, structName, recordNamespace)
+ createRdd(df, avroSchema, structName, recordNamespace)
}
- def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
+ def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
- val dataType = df.schema
+ // Use the Avro schema to derive the StructType which has the correct nullability information
+ val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+ val avroSchemaAsJsonString = avroSchema.toString
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5f3259a..2dd4138 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -290,8 +290,19 @@ public class DeltaSync implements Serializable {
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- avroRDDOptional = transformed
- .map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
+ if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) {
+ // If the target schema is specified through Avro schema,
+ // pass in the schema for the Row-to-Avro conversion
+ // to avoid nullability mismatch between Avro schema and Row schema
+ avroRDDOptional = transformed
+ .map(t -> AvroConversionUtils.createRdd(
+ t, this.schemaProvider.getTargetSchema(),
+ HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
+ } else {
+ avroRDDOptional = transformed
+ .map(t -> AvroConversionUtils.createRdd(
+ t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
+ }
// Use Transformed Row's schema if not overridden
// Use Transformed Row's schema if not overridden. If target schema is not specified
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index dd266ed..9c0be88 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
@@ -64,7 +65,17 @@ public final class SourceFormatAdapter {
case ROW: {
InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
return new InputBatch<>(Option.ofNullable(r.getBatch().map(
- rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()))
+ rdd -> (
+ (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
+ // If the source schema is specified through Avro schema,
+ // pass in the schema for the Row-to-Avro conversion
+ // to avoid nullability mismatch between Avro schema and Row schema
+ ? AvroConversionUtils.createRdd(
+ rdd, r.getSchemaProvider().getSourceSchema(),
+ HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
+ : AvroConversionUtils.createRdd(
+ rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
+ ))
.orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
default:
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 641c47b..cbf9db9 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -44,6 +45,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.DistributedTestDataSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
@@ -96,8 +98,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final Random RANDOM = new Random();
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
+ private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
+ private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+ private static final int PARQUET_NUM_RECORDS = 5;
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
+ private static int parquetTestNum = 1;
+
@BeforeClass
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
@@ -146,6 +153,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID);
+
+ prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
}
@AfterClass
@@ -186,17 +195,24 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
String payloadClassName, String tableType) {
+ return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
+ useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType);
+ }
+
+ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
+ String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
+ int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
- cfg.sourceClassName = TestDataSource.class.getName();
+ cfg.sourceClassName = sourceClassName;
cfg.transformerClassName = transformerClassName;
cfg.operation = op;
cfg.enableHiveSync = enableHiveSync;
cfg.sourceOrderingField = "timestamp";
cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
- cfg.sourceLimit = 1000;
+ cfg.sourceLimit = sourceLimit;
if (updatePayloadClass) {
cfg.payloadClassName = payloadClassName;
}
@@ -620,6 +636,62 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
Assert.assertEquals(1000, c);
}
+ private static void prepareParquetDFSFiles(int numRecords) throws IOException {
+ String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path));
+ }
+
+ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
+ // Properties used for testing delta-streamer with Parquet source
+ TypedProperties parquetProps = new TypedProperties();
+ parquetProps.setProperty("include", "base.properties");
+ parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+ parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+ if (useSchemaProvider) {
+ parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
+ if (hasTransformer) {
+ parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc");
+ }
+ }
+ parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);
+
+ UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
+ }
+
+ private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
+ prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
+ String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
+ transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null), jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
+ parquetTestNum++;
+ }
+
+ @Test
+ public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
+ testParquetDFSSource(false, null);
+ }
+
+ @Test
+ public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
+ testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
+ }
+
+ @Test
+ public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception {
+ testParquetDFSSource(true, null);
+ }
+
+ @Test
+ public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
+ testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
+ }
+
/**
* UDF to calculate Haversine distance.
*/