You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/12 15:04:30 UTC

[hudi] branch master updated: [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eacae1e0dc4 [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)
eacae1e0dc4 is described below

commit eacae1e0dc44e29bbe5cfc29475666bb16ee478d
Author: Tim Brown <ti...@onehouse.ai>
AuthorDate: Thu Jan 12 07:04:17 2023 -0800

    [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)
    
    - Adds a new KeyGenerator that does not require the user to specify any fields to use for the record key and instead deterministically generates a UUID based off a subset of fields in the incoming record.
---
 .../apache/hudi/keygen/KeylessKeyGenerator.java    | 239 +++++++++++++++++++++
 .../hudi/keygen/TestKeylessKeyGenerator.java       | 119 ++++++++++
 .../src/test/resources/keyless_schema.avsc         |  44 ++++
 .../hudi/keygen/constant/KeyGeneratorOptions.java  |   6 +
 4 files changed, 408 insertions(+)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
new file mode 100644
index 00000000000..d487e7e1ff9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to
+ * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled.
+ * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is:
+ * <ul>
+ *   <li>timestamp</li>
+ *   <li>numeric values</li>
+ *   <li>string, byte arrays, other types not mentioned</li>
+ *   <li>date, lists, maps, booleans</li>
+ * </ul>
+ * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets.
+ */
+public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
+  private static final String HOODIE_PREFIX = "_hoodie";
+  private static final String DOT = ".";
+  private final int maxFieldsToConsider;
+  private final int numFieldsForKey;
+  private final Set<String> partitionFieldNames;
+  private int[][] fieldOrdering;
+
+  public KeylessKeyGenerator(TypedProperties props) {
+    super(props);
+    this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+    // cap the number of fields to order in case of large schemas
+    this.maxFieldsToConsider = numFieldsForKey * 3;
+    this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return buildKey(getFieldOrdering(record), record);
+  }
+
+  int[][] getFieldOrdering(GenericRecord genericRecord) {
+    if (fieldOrdering == null) {
+      fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields());
+    }
+    return fieldOrdering;
+  }
+
+  /**
+   * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to
+   * {@link UUID#nameUUIDFromBytes(byte[])}.
+   * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object.
+   * @param input the input object that needs a key
+   * @return a deterministically generated {@link UUID}
+   * @param <T> the input object type
+   */
+  private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) {
+    StringBuilder key = new StringBuilder();
+    int nonNullFields = 0;
+    for (int[] index : fieldOrdering) {
+      Object value = getFieldForRecord(input, index);
+      if (value == null) {
+        continue;
+      }
+      nonNullFields++;
+      key.append(value.hashCode());
+      if (nonNullFields >= numFieldsForKey) {
+        break;
+      }
+    }
+    return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString();
+  }
+
+  /**
+   * Gets the value of the field at the specified path within the record.
+   * @param record the input record
+   * @param fieldPath the path to the field as an array of integers representing the field position within the object
+   * @return value at the path
+   */
+  private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) {
+    Object value = record;
+    for (Integer index : fieldPath) {
+      if (value == null) {
+        return null;
+      }
+      value = ((GenericRecord) value).get(index);
+    }
+    return value;
+  }
+
+  private int[][] buildFieldOrdering(List<Schema.Field> initialFields) {
+    PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance());
+    Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>();
+    for (int j = 0; j < initialFields.size(); j++) {
+      fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name()));
+    }
+    while (!fieldsToProcess.isEmpty()) {
+      FieldToProcess fieldToProcess = fieldsToProcess.poll();
+      int[] existingPath = fieldToProcess.getIndexPath();
+      Schema fieldSchema = fieldToProcess.getField().schema();
+      if (fieldSchema.getType() == Schema.Type.UNION) {
+        fieldSchema = fieldSchema.getTypes().get(1);
+      }
+      if (fieldSchema.getType() == Schema.Type.RECORD) {
+        List<Schema.Field> nestedFields = fieldSchema.getFields();
+        for (int i = 0; i < nestedFields.size(); i++) {
+          int[] path = Arrays.copyOf(existingPath, existingPath.length + 1);
+          path[existingPath.length] = i;
+          Schema.Field nestedField = nestedFields.get(i);
+          fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name()));
+        }
+      } else {
+        // check that field is not used in partitioning
+        if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) {
+          queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField())));
+          if (queue.size() > maxFieldsToConsider) {
+            queue.poll();
+          }
+        }
+      }
+    }
+    Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]);
+    Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed());
+    int[][] output = new int[sortedPairs.length][];
+    for (int k = 0; k < sortedPairs.length; k++) {
+      output[k] = sortedPairs[k].getLeft();
+    }
+    return output;
+  }
+
+  private static class FieldToProcess {
+    final int[] indexPath;
+    final Schema.Field field;
+    final String namePath;
+
+    public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) {
+      this.indexPath = indexPath;
+      this.field = field;
+      this.namePath = namePath;
+    }
+
+    public int[] getIndexPath() {
+      return indexPath;
+    }
+
+    public Schema.Field getField() {
+      return field;
+    }
+
+    public String getNamePath() {
+      return namePath;
+    }
+  }
+
+  /**
+   * Ranks the fields by their type.
+   * @param field input field
+   * @return a score of 0 to 4
+   */
+  private int getSchemaRanking(Schema.Field field) {
+    if (field.name().startsWith(HOODIE_PREFIX)) {
+      return 0;
+    }
+    Schema schema = field.schema();
+    if (schema.getType() == Schema.Type.UNION) {
+      schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0);
+    }
+    Schema.Type type = schema.getType();
+    switch (type) {
+      case LONG:
+        // assumes long with logical type will be a timestamp
+        return schema.getLogicalType() != null ? 4 : 3;
+      case INT:
+        // assumes long with logical type will be a date which will have low variance in a batch
+        return schema.getLogicalType() != null ? 1 : 3;
+      case DOUBLE:
+      case FLOAT:
+        return 3;
+      case BOOLEAN:
+      case MAP:
+      case ARRAY:
+        return 1;
+      default:
+        return 2;
+    }
+  }
+
+  private static class RankingComparator implements Comparator<Pair<int[], Integer>> {
+    private static final RankingComparator INSTANCE = new RankingComparator();
+
+    static RankingComparator getInstance() {
+      return INSTANCE;
+    }
+
+    @Override
+    public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) {
+      int initialResult = o1.getRight().compareTo(o2.getRight());
+      if (initialResult == 0) {
+        // favor the smaller list (less nested value) on ties
+        int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length);
+        if (sizeResult == 0) {
+          return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]);
+        }
+        return sizeResult;
+      }
+      return initialResult;
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
new file mode 100644
index 00000000000..af6b30e3f09
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class TestKeylessKeyGenerator {
+  private static final long TIME = 1672265446090L;
+  private static final Schema SCHEMA;
+
+  static {
+    try {
+      SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Test
+  public void createKeyWithoutPartitionColumn() {
+    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    String actualForRecord = keyGenerator.getRecordKey(record);
+    Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+  }
+
+  @Test
+  public void createKeyWithPartition() {
+    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
+    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    String actualForRecord = keyGenerator.getRecordKey(record);
+    Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+  }
+
+  @Test
+  public void nullFieldsProperlyHandled() {
+    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+    GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
+    String actualForRecord = keyGenerator.getRecordKey(record);
+    Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
+  }
+
+  @Test
+  public void assertOnlySubsetOfFieldsUsed() {
+    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+    GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    String actualForRecord1 = keyGenerator.getRecordKey(record1);
+    GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
+    String actualForRecord2 = keyGenerator.getRecordKey(record2);
+    Assertions.assertEquals(actualForRecord2, actualForRecord1);
+  }
+
+  @Test
+  public void numFieldsImpactsKeyGen() {
+    KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+    KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
+  }
+
+  @Test
+  public void nestedColumnsUsed() {
+    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
+    String actualForRecord = keyGenerator.getRecordKey(record);
+    Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
+  }
+
+  protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
+    GenericRecord nestedRecord = null;
+    if (nestedDouble != null) {
+      nestedRecord = new GenericRecordBuilder(SCHEMA.getField("nested_struct").schema().getTypes().get(1))
+          .set("doubly_nested", nestedDouble)
+          .build();
+    }
+
+    return new GenericRecordBuilder(SCHEMA)
+        .set("partition_field", partitionField)
+        .set("string_field", stringValue)
+        .set("integer_field", integerValue)
+        .set("long_field", longValue)
+        .set("timestamp_field", timestampValue)
+        .set("nested_struct", nestedRecord)
+        .build();
+  }
+
+  protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
+    properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
+    return properties;
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc
new file mode 100644
index 00000000000..2966841eef6
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc
@@ -0,0 +1,44 @@
+{
+  "namespace": "keyless",
+  "type": "record",
+  "name": "message",
+  "fields": [
+    {
+      "name": "partition_field",
+      "type": "string"
+    },
+    {
+      "name": "string_field",
+      "type": "string"
+    },
+    {
+      "name": "integer_field",
+      "type": ["null", "int"],
+      "default": null
+    },
+    {
+      "name": "long_field",
+      "type": ["null", "long"],
+      "default": null
+    },
+    {
+      "name": "timestamp_field",
+      "type": ["null", {"type":"long","logicalType":"timestamp-millis"}],
+      "default": null
+    },
+    {
+      "name": "nested_struct",
+      "type": ["null", {
+        "type": "record",
+        "name": "nested",
+        "fields": [
+          {
+            "name": "doubly_nested",
+            "type": "double"
+          }
+        ]
+      }],
+      "default": null
+    }
+  ]
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
index 99d40439b7c..b0a46ac0676 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -69,6 +69,12 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. "
           + "If enabled, then the timestamp value will be written in both the cases.");
 
+  public static final ConfigProperty<Integer> NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty
+      .key("hoodie.datasource.write.recordkey.keyless.field.count")
+      .defaultValue(5)
+      .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. "
+          + "Increasing the value will increase the randomness of the generated key but can impact performance.");
+
   /**
    * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods.
    */