You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/17 20:37:10 UTC

[hudi] branch master updated: [HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1d5d0  [HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)
3d1d5d0 is described below

commit 3d1d5d00b00987e0c04e4ff4692fc50d1be85922
Author: satishkotha <sa...@uber.com>
AuthorDate: Sun Jan 17 12:36:55 2021 -0800

    [HUDI-1533] Make SerializableSchema work for large schemas and add ability to sortBy numeric values (#2453)
---
 .../bulkinsert/RDDCustomColumnsSortPartitioner.java | 19 ++++++++++++-------
 .../hudi/common/config/SerializableSchema.java      |  9 +++++++--
 .../hudi/common/util/TestSerializableSchema.java    | 21 +++++++++++++++++++++
 3 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 65c2000..209531d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,18 +59,22 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
   public boolean arePartitionRecordsSorted() {
     return true;
   }
-  
-  private static String getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
+
+  private static Object getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
                                                   String[] sortColumns,
                                                   SerializableSchema schema) {
     try {
       GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
-      StringBuilder sb = new StringBuilder();
-      for (String col : sortColumns) {
-        sb.append(genericRecord.get(col));
-      }
+      if (sortColumns.length == 1) {
+        return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true);
+      } else {
+        StringBuilder sb = new StringBuilder();
+        for (String col : sortColumns) {
+          sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
+        }
 
-      return sb.toString();
+        return sb.toString();
+      }
     } catch (IOException e) {
       throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
index 8f6da70..4f6de8b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java
@@ -62,12 +62,17 @@ public class SerializableSchema implements Serializable {
   
   // create a public write method for unit test
   public void writeObjectTo(ObjectOutputStream out) throws IOException {
-    out.writeUTF(schema.toString());
+    // Note: writeUTF cannot support string length > 64K. So use writeObject which has small overhead (relatively).
+    out.writeObject(schema.toString());
   }
 
   // create a public read method for unit test
   public void readObjectFrom(ObjectInputStream in) throws IOException {
-    schema = new Schema.Parser().parse(in.readUTF());
+    try {
+      schema = new Schema.Parser().parse(in.readObject().toString());
+    } catch (ClassNotFoundException e) {
+      throw new IOException("unable to parse schema", e);
+    }
   }
 
   @Override
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
index 72843a4..03421a3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java
@@ -49,6 +49,11 @@ public class TestSerializableSchema {
     verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS);
   }
   
+  @Test
+  public void testLargeSchema() throws IOException {
+    verifySchema(new Schema.Parser().parse(generateLargeSchema()));
+  }
+  
   private void verifySchema(Schema schema) throws IOException {
     SerializableSchema serializableSchema = new SerializableSchema(schema);
     assertEquals(schema, serializableSchema.get());
@@ -65,4 +70,20 @@ public class TestSerializableSchema {
     newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten)));
     assertEquals(schema, newSchema.get());
   }
+  
+  // generate large schemas (>64K which is limitation of ObjectOutputStream#writeUTF) to validate it can be serialized
+  private String generateLargeSchema() {
+    StringBuilder schema = new StringBuilder();
+    schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX);
+    int fieldNum = 1;
+    while (schema.length() < 80 * 1024) {
+      String fieldName = "field" + fieldNum;
+      schema.append("{\"name\": \"" + fieldName + "\",\"type\": {\"type\":\"record\", \"name\":\"" + fieldName + "\",\"fields\": ["
+          + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},");
+      fieldNum++;
+    }
+    
+    schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX);
+    return schema.toString();
+  }
 }