You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/12 15:04:30 UTC
[hudi] branch master updated: [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eacae1e0dc4 [HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)
eacae1e0dc4 is described below
commit eacae1e0dc44e29bbe5cfc29475666bb16ee478d
Author: Tim Brown <ti...@onehouse.ai>
AuthorDate: Thu Jan 12 07:04:17 2023 -0800
[HUDI-5514] Add in support for a keyless workflow by building an ID based off of values within the record (#7640)
- Adds a new KeyGenerator that does not require the user to specify any fields to use for the record key and instead deterministically generates a UUID based off a subset of fields in the incoming record.
---
.../apache/hudi/keygen/KeylessKeyGenerator.java | 239 +++++++++++++++++++++
.../hudi/keygen/TestKeylessKeyGenerator.java | 119 ++++++++++
.../src/test/resources/keyless_schema.avsc | 44 ++++
.../hudi/keygen/constant/KeyGeneratorOptions.java | 6 +
4 files changed, 408 insertions(+)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
new file mode 100644
index 00000000000..d487e7e1ff9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to
+ * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled.
+ * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is:
+ * <ul>
+ * <li>timestamp</li>
+ * <li>numeric values</li>
+ * <li>string, byte arrays, other types not mentioned</li>
+ * <li>date, lists, maps, booleans</li>
+ * </ul>
+ * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets.
+ */
+public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
+ private static final String HOODIE_PREFIX = "_hoodie";
+ private static final String DOT = ".";
+ private final int maxFieldsToConsider;
+ private final int numFieldsForKey;
+ private final Set<String> partitionFieldNames;
+ private int[][] fieldOrdering;
+
+ public KeylessKeyGenerator(TypedProperties props) {
+ super(props);
+ this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+ // cap the number of fields to order in case of large schemas
+ this.maxFieldsToConsider = numFieldsForKey * 3;
+ this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return buildKey(getFieldOrdering(record), record);
+ }
+
+ int[][] getFieldOrdering(GenericRecord genericRecord) {
+ if (fieldOrdering == null) {
+ fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields());
+ }
+ return fieldOrdering;
+ }
+
+ /**
+ * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to
+ * {@link UUID#nameUUIDFromBytes(byte[])}.
+ * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object.
+ * @param input the input object that needs a key
+ * @return a deterministically generated {@link UUID}
+ * @param <T> the input object type
+ */
+ private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) {
+ StringBuilder key = new StringBuilder();
+ int nonNullFields = 0;
+ for (int[] index : fieldOrdering) {
+ Object value = getFieldForRecord(input, index);
+ if (value == null) {
+ continue;
+ }
+ nonNullFields++;
+ key.append(value.hashCode());
+ if (nonNullFields >= numFieldsForKey) {
+ break;
+ }
+ }
+ return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString();
+ }
+
+ /**
+ * Gets the value of the field at the specified path within the record.
+ * @param record the input record
+ * @param fieldPath the path to the field as an array of integers representing the field position within the object
+ * @return value at the path
+ */
+ private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) {
+ Object value = record;
+ for (Integer index : fieldPath) {
+ if (value == null) {
+ return null;
+ }
+ value = ((GenericRecord) value).get(index);
+ }
+ return value;
+ }
+
+ private int[][] buildFieldOrdering(List<Schema.Field> initialFields) {
+ PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance());
+ Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>();
+ for (int j = 0; j < initialFields.size(); j++) {
+ fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name()));
+ }
+ while (!fieldsToProcess.isEmpty()) {
+ FieldToProcess fieldToProcess = fieldsToProcess.poll();
+ int[] existingPath = fieldToProcess.getIndexPath();
+ Schema fieldSchema = fieldToProcess.getField().schema();
+ if (fieldSchema.getType() == Schema.Type.UNION) {
+ fieldSchema = fieldSchema.getTypes().get(1);
+ }
+ if (fieldSchema.getType() == Schema.Type.RECORD) {
+ List<Schema.Field> nestedFields = fieldSchema.getFields();
+ for (int i = 0; i < nestedFields.size(); i++) {
+ int[] path = Arrays.copyOf(existingPath, existingPath.length + 1);
+ path[existingPath.length] = i;
+ Schema.Field nestedField = nestedFields.get(i);
+ fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name()));
+ }
+ } else {
+ // check that field is not used in partitioning
+ if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) {
+ queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField())));
+ if (queue.size() > maxFieldsToConsider) {
+ queue.poll();
+ }
+ }
+ }
+ }
+ Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]);
+ Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed());
+ int[][] output = new int[sortedPairs.length][];
+ for (int k = 0; k < sortedPairs.length; k++) {
+ output[k] = sortedPairs[k].getLeft();
+ }
+ return output;
+ }
+
+ private static class FieldToProcess {
+ final int[] indexPath;
+ final Schema.Field field;
+ final String namePath;
+
+ public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) {
+ this.indexPath = indexPath;
+ this.field = field;
+ this.namePath = namePath;
+ }
+
+ public int[] getIndexPath() {
+ return indexPath;
+ }
+
+ public Schema.Field getField() {
+ return field;
+ }
+
+ public String getNamePath() {
+ return namePath;
+ }
+ }
+
+ /**
+ * Ranks the fields by their type.
+ * @param field input field
+ * @return a score of 0 to 4
+ */
+ private int getSchemaRanking(Schema.Field field) {
+ if (field.name().startsWith(HOODIE_PREFIX)) {
+ return 0;
+ }
+ Schema schema = field.schema();
+ if (schema.getType() == Schema.Type.UNION) {
+ schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0);
+ }
+ Schema.Type type = schema.getType();
+ switch (type) {
+ case LONG:
+ // assumes long with logical type will be a timestamp
+ return schema.getLogicalType() != null ? 4 : 3;
+ case INT:
+ // assumes long with logical type will be a date which will have low variance in a batch
+ return schema.getLogicalType() != null ? 1 : 3;
+ case DOUBLE:
+ case FLOAT:
+ return 3;
+ case BOOLEAN:
+ case MAP:
+ case ARRAY:
+ return 1;
+ default:
+ return 2;
+ }
+ }
+
+ private static class RankingComparator implements Comparator<Pair<int[], Integer>> {
+ private static final RankingComparator INSTANCE = new RankingComparator();
+
+ static RankingComparator getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) {
+ int initialResult = o1.getRight().compareTo(o2.getRight());
+ if (initialResult == 0) {
+ // favor the smaller list (less nested value) on ties
+ int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length);
+ if (sizeResult == 0) {
+ return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]);
+ }
+ return sizeResult;
+ }
+ return initialResult;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
new file mode 100644
index 00000000000..af6b30e3f09
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+public class TestKeylessKeyGenerator {
+ private static final long TIME = 1672265446090L;
+ private static final Schema SCHEMA;
+
+ static {
+ try {
+ SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Test
+ public void createKeyWithoutPartitionColumn() {
+ KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+ GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ String actualForRecord = keyGenerator.getRecordKey(record);
+ Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+ }
+
+ @Test
+ public void createKeyWithPartition() {
+ KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
+ GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ String actualForRecord = keyGenerator.getRecordKey(record);
+ Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+ }
+
+ @Test
+ public void nullFieldsProperlyHandled() {
+ KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+ GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
+ String actualForRecord = keyGenerator.getRecordKey(record);
+ Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
+ }
+
+ @Test
+ public void assertOnlySubsetOfFieldsUsed() {
+ KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+ GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ String actualForRecord1 = keyGenerator.getRecordKey(record1);
+ GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
+ String actualForRecord2 = keyGenerator.getRecordKey(record2);
+ Assertions.assertEquals(actualForRecord2, actualForRecord1);
+ }
+
+ @Test
+ public void numFieldsImpactsKeyGen() {
+ KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
+ KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+ GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
+ }
+
+ @Test
+ public void nestedColumnsUsed() {
+ KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+ GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
+ String actualForRecord = keyGenerator.getRecordKey(record);
+ Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
+ }
+
+ protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
+ GenericRecord nestedRecord = null;
+ if (nestedDouble != null) {
+ nestedRecord = new GenericRecordBuilder(SCHEMA.getField("nested_struct").schema().getTypes().get(1))
+ .set("doubly_nested", nestedDouble)
+ .build();
+ }
+
+ return new GenericRecordBuilder(SCHEMA)
+ .set("partition_field", partitionField)
+ .set("string_field", stringValue)
+ .set("integer_field", integerValue)
+ .set("long_field", longValue)
+ .set("timestamp_field", timestampValue)
+ .set("nested_struct", nestedRecord)
+ .build();
+ }
+
+ protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
+ TypedProperties properties = new TypedProperties();
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
+ properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
+ return properties;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc
new file mode 100644
index 00000000000..2966841eef6
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/resources/keyless_schema.avsc
@@ -0,0 +1,44 @@
+{
+ "namespace": "keyless",
+ "type": "record",
+ "name": "message",
+ "fields": [
+ {
+ "name": "partition_field",
+ "type": "string"
+ },
+ {
+ "name": "string_field",
+ "type": "string"
+ },
+ {
+ "name": "integer_field",
+ "type": ["null", "int"],
+ "default": null
+ },
+ {
+ "name": "long_field",
+ "type": ["null", "long"],
+ "default": null
+ },
+ {
+ "name": "timestamp_field",
+ "type": ["null", {"type":"long","logicalType":"timestamp-millis"}],
+ "default": null
+ },
+ {
+ "name": "nested_struct",
+ "type": ["null", {
+ "type": "record",
+ "name": "nested",
+ "fields": [
+ {
+ "name": "doubly_nested",
+ "type": "double"
+ }
+ ]
+ }],
+ "default": null
+ }
+ ]
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
index 99d40439b7c..b0a46ac0676 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -69,6 +69,12 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. "
+ "If enabled, then the timestamp value will be written in both the cases.");
+ public static final ConfigProperty<Integer> NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty
+ .key("hoodie.datasource.write.recordkey.keyless.field.count")
+ .defaultValue(5)
+ .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. "
+ + "Increasing the value will increase the randomness of the generated key but can impact performance.");
+
/**
* @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods.
*/