You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/17 20:37:10 UTC
[hudi] branch master updated: [HUDI-1533] Make SerializableSchema
work for large schemas and add ability to sortBy numeric values (#2453)
This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1d5d0 [HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)
3d1d5d0 is described below
commit 3d1d5d00b00987e0c04e4ff4692fc50d1be85922
Author: satishkotha <sa...@uber.com>
AuthorDate: Sun Jan 17 12:36:55 2021 -0800
[HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)
---
.../bulkinsert/RDDCustomColumnsSortPartitioner.java | 19 ++++++++++++-------
.../hudi/common/config/SerializableSchema.java | 9 +++++++--
.../hudi/common/util/TestSerializableSchema.java | 21 +++++++++++++++++++++
3 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 65c2000..209531d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.execution.bulkinsert;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,18 +59,22 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
public boolean arePartitionRecordsSorted() {
return true;
}
-
- private static String getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
+
+ private static Object getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] sortColumns,
SerializableSchema schema) {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
- StringBuilder sb = new StringBuilder();
- for (String col : sortColumns) {
- sb.append(genericRecord.get(col));
- }
+ if (sortColumns.length == 1) {
+ return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true);
+ } else {
+ StringBuilder sb = new StringBuilder();
+ for (String col : sortColumns) {
+ sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
+ }
- return sb.toString();
+ return sb.toString();
+ }
} catch (IOException e) {
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
index 8f6da70..4f6de8b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
@@ -62,12 +62,17 @@ public class SerializableSchema implements Serializable {
// create a public write method for unit test
public void writeObjectTo(ObjectOutputStream out) throws IOException {
- out.writeUTF(schema.toString());
+ // Note: writeUTF cannot support string length > 64K. So use writeObject which has small overhead (relatively).
+ out.writeObject(schema.toString());
}
// create a public read method for unit test
public void readObjectFrom(ObjectInputStream in) throws IOException {
- schema = new Schema.Parser().parse(in.readUTF());
+ try {
+ schema = new Schema.Parser().parse(in.readObject().toString());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("unable to parse schema", e);
+ }
}
@Override
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
index 72843a4..03421a3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
@@ -49,6 +49,11 @@ public class TestSerializableSchema {
verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS);
}
+ @Test
+ public void testLargeSchema() throws IOException {
+ verifySchema(new Schema.Parser().parse(generateLargeSchema()));
+ }
+
private void verifySchema(Schema schema) throws IOException {
SerializableSchema serializableSchema = new SerializableSchema(schema);
assertEquals(schema, serializableSchema.get());
@@ -65,4 +70,20 @@ public class TestSerializableSchema {
newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten)));
assertEquals(schema, newSchema.get());
}
+
+ // generate large schemas (>64K which is limitation of ObjectOutputStream#writeUTF) to validate it can be serialized
+ private String generateLargeSchema() {
+ StringBuilder schema = new StringBuilder();
+ schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX);
+ int fieldNum = 1;
+ while (schema.length() < 80 * 1024) {
+ String fieldName = "field" + fieldNum;
+ schema.append("{\"name\": \"" + fieldName + "\",\"type\": {\"type\":\"record\", \"name\":\"" + fieldName + "\",\"fields\": ["
+ + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},");
+ fieldNum++;
+ }
+
+ schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX);
+ return schema.toString();
+ }
}