You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/20 03:32:24 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r974403023


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordCompatibilityInterface.java:
##########
@@ -18,21 +18,28 @@
 
 package org.apache.hudi.common.model;
 
+import java.io.IOException;
+import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
+public interface HoodieRecordCompatibilityInterface {
 
-/**
- * HoodieMerge defines how to merge two records. It is a stateless component.
- * It can implement the merging logic of HoodieRecord of different engines
- * and avoid the performance consumption caused by the serialization/deserialization of Avro payload.
- */
-public interface HoodieMerge extends Serializable {
-  
-  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+  /**
+   * This method used to extract HoodieKey not through keyGenerator.
+   */
+  HoodieRecord wrapIntoHoodieRecordPayloadWithParams(

Review Comment:
   👍 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -147,19 +141,19 @@ public static HoodieMergedLogRecordScanner.Builder newBuilder() {
   }
 
   @Override
-  protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException {
+  protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws IOException {
     String key = hoodieRecord.getRecordKey();
     if (records.containsKey(key)) {
       // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
       // done when a DELETE (empty payload) is encountered before or after an insert/update.
 
-      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
-      HoodieRecordPayload oldValue = oldRecord.getData();
-      HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord).getData();
+      HoodieRecord<T> oldRecord = records.get(key);
+      T oldValue = oldRecord.getData();
+      T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord, hoodieRecord, readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps()).get()).getData();
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedValue != oldValue) {
-        HoodieOperation operation = hoodieRecord.getOperation();
-        records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
+        hoodieRecord.setData(combinedValue);

Review Comment:
   Why are we resetting the data instead of using new `HoodieRecord` returned by the Merger?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -126,11 +129,17 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
 
-  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
-      .key("hoodie.datasource.write.merge.class")
-      .defaultValue(HoodieAvroRecordMerge.class.getName())
-      .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
-          + "types, such as Spark records or Flink records.");
+  public static final ConfigProperty<String> MERGER_IMPLS = ConfigProperty
+      .key("hoodie.datasource.write.merger.impls")
+      .defaultValue(HoodieAvroRecordMerger.class.getName())
+      .withDocumentation("List of HoodieMerger implementations constituting Hudi's merging strategy -- based on the engine used. "
+          + "These merger impls will filter by hoodie.datasource.write.merger.strategy "
+          + "Hudi will pick most efficient implementation to perform merging/combining of the records (during update, reading MOR table, etc)");
+
+  public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
+      .key("hoodie.datasource.write.merger.strategy")
+      .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)

Review Comment:
   Let's move this to HoodieMerger, rather than `StringUtils` (we can do it in a follow-up)



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -156,11 +155,10 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then "
           + " produce a new base file.");
 
-  public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
-      .key("hoodie.compaction.merge.class")
-      .defaultValue(HoodieAvroRecordMerge.class.getName())
-      .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
-          + "types, such as Spark records or Flink records.");
+  public static final ConfigProperty<String> MERGER_STRATEGY = ConfigProperty
+      .key("hoodie.compaction.merger.strategy")
+      .defaultValue(StringUtils.DEFAULT_MERGER_STRATEGY_UUID)
+      .withDocumentation("Id of merger strategy.  Hudi will pick RecordMergers in hoodie.datasource.write.merger.impls which has the same merger strategy id");

Review Comment:
   nit: `HoodieRecordMerger` implementations



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -244,15 +242,24 @@ public class HoodieTableConfig extends HoodieConfig {
 
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
 
-  public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
+  public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName, String mergerStrategy) {

Review Comment:
   nit: `mergerStrategyId`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField,
+      Option<String> partitionName) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair<String, String> recordKeyPartitionPathFieldPair,
+      boolean withOperationField, Option<String> partitionName) {
+    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();
+    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
+        getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString());
+
+    HoodieOperation operation = withOperationField
+        ? HoodieOperation.fromName(getNullableValAsString(structType, data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, structType, operation);
+  }
+
+  private static Object getValue(StructType structType, String fieldName, InternalRow row) {
+    NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+    return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+  }
+
+  /**
+   * Returns the string value of the given record {@code rec} and field {@code fieldName}. The field and value both could be missing.
+   *
+   * @param row       The record
+   * @param fieldName The field name
+   * @return the string form of the field or empty if the schema does not contain the field name or the value is null
+   */
+  private static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {

Review Comment:
   I don't think we need this method (we can use `getValue(...).toString` instead)



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -291,59 +284,51 @@ public void checkState() {
     }
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here
-  //       for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
-  //       is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;
 
   /**
-   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
+   * Rewrite record into new schema(add meta columns)
    */
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException;
+  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException;
 
-  public abstract HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException;
+  /**
+   * Support schema evolution.
+   */
+  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException;
 
-  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException;
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException {
+    return rewriteRecordWithNewSchema(recordSchema, props, newSchema, Collections.emptyMap());
+  }
 
-  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException;
+  /**
+   * This method could change in the future.
+   * @temporary
+   */
+  public abstract HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException;
 
-  public abstract HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException;
+  public abstract boolean isDelete(Schema schema, Properties props) throws IOException;
 
-  public abstract HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException;
+  /**
+   * Is EmptyRecord. Generated by ExpressionPayload.
+   */
+  public abstract boolean shouldIgnore(Schema schema, Properties props) throws IOException;
 
-  public abstract HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException;
+  public abstract Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties props) throws IOException;

Review Comment:
   Let's move this method to `HoodieCompatibilityInterface`



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -20,33 +20,27 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
 public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {

Review Comment:
   We should rename this to be `HoodieLegacyAvroRecord` (to make it more clear that this will eventually be going away)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField,
+      Option<String> partitionName) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair<String, String> recordKeyPartitionPathFieldPair,
+      boolean withOperationField, Option<String> partitionName) {
+    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();

Review Comment:
   We can't do that, we need to use key-gen to fetch both record-key and partition-path



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, boolean withOperationField,
+      Option<String> partitionName) {
+    return convertToHoodieSparkRecord(structType, data,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, Pair<String, String> recordKeyPartitionPathFieldPair,
+      boolean withOperationField, Option<String> partitionName) {
+    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();
+    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
+        getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString());
+
+    HoodieOperation operation = withOperationField
+        ? HoodieOperation.fromName(getNullableValAsString(structType, data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+    return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath), data, structType, operation);
+  }
+
+  private static Object getValue(StructType structType, String fieldName, InternalRow row) {
+    NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
+    return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+  }
+
+  /**
+   * Returns the string value of the given record {@code rec} and field {@code fieldName}. The field and value both could be missing.
+   *
+   * @param row       The record
+   * @param fieldName The field name
+   * @return the string form of the field or empty if the schema does not contain the field name or the value is null
+   */
+  private static Option<String> getNullableValAsString(StructType structType, InternalRow row, String fieldName) {
+    String fieldVal = !HoodieCatalystExpressionUtils$.MODULE$.existField(structType, fieldName)
+        ? null : StringUtils.objToString(getValue(structType, fieldName, row));
+    return Option.ofNullable(fieldVal);
+  }
+
+  /**
+   * Gets record column values into one object.
+   *
+   * @param row  InternalRow.
+   * @param columns Names of the columns to get values.
+   * @param structType  {@link StructType} instance.
+   * @return Column value if a single column, or concatenated String values by comma.
+   */
+  public static Object getRecordColumnValues(InternalRow row,

Review Comment:
   Please check my other comment regarding making this method return an array of objects, instead of concatenated string



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -291,59 +284,51 @@ public void checkState() {
     }
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-
-  //
-  // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here
-  //       for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload`
-  //       is complete
-  //
-  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException;
+  /**
+   * Get column in record to support RDDCustomColumnsSortPartitioner
+   */
+  public abstract Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled);
 
-  public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException;
+  /**
+   * Support bootstrap.
+   */
+  public abstract HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException;

Review Comment:
   We should not have this method in the `HoodieRecord` -- this method should be implemented w/in `HoodieRecordMerger` itself



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -213,6 +200,10 @@ public HoodieRecord setCurrentLocation(HoodieRecordLocation location) {
     return this;
   }
 
+  public void setData(T data) {

Review Comment:
   We should avoid any mutating methods in the interface



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  private StructType structType = null;

Review Comment:
   We should make this `transient` to make sure we don't accidentally serialize schema along w/ every record



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java:
##########
@@ -20,18 +20,41 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
 public class HoodieFileReaderFactory {
 
-  public static HoodieAvroFileReader getFileReader(Configuration conf, Path path) throws IOException {
+  public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        return HoodieAvroFileReaderFactory.getFileReaderFactory();
+      case SPARK:
+        try {
+          Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
+          Method method = clazz.getMethod("getFileReaderFactory", null);

Review Comment:
   Few nits:
    - We don't need to make FileFactory a singleton, we can instantiate it every time
    - That makes this much simpler we can just call `newInstance`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils$;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  private StructType structType = null;
+  private Option<Long> schemaFingerPrint = Option.empty();
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    initSchema(schema);
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    initSchema(schema);
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation) {
+    super(key, data, operation);
+    initSchema(schema);
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    initSchema(record.getStructType());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, getStructType(), op);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, getStructType());
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
+        .getRecordKey(data, getStructType()).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    DataType dataType = getStructType().apply(keyFieldName).dataType();
+    int pos = getStructType().fieldIndex(keyFieldName);
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(Schema recordSchema, String[] columns, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, getStructType(), consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema targetSchema) throws IOException {
+    StructType otherStructType = ((HoodieSparkRecord) other).getStructType();
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, getStructType(), (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    UTF8String[] metaFields = extractMetaField(targetStructType);
+    if (metaFields.length == 0) {
+      throw new UnsupportedOperationException();
+    }
+
+    InternalRow resultRow;
+    if (extractMetaField(getStructType()).length == 0) {
+      resultRow = new HoodieInternalRow(metaFields, data, false);
+    } else {
+      resultRow = new HoodieInternalRow(metaFields, data, true);
+    }
+
+    return new HoodieSparkRecord(getKey(), resultRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, getStructType(), newStructType, renameCols);
+    UnsafeProjection unsafeConvert = HoodieInternalRowUtils.getCachedUnsafeConvert(newStructType);
+    InternalRow resultRow = unsafeConvert.apply(rewriteRow);
+    UTF8String[] metaFields = extractMetaField(newStructType);
+    if (metaFields.length > 0) {
+      resultRow = new HoodieInternalRow(metaFields, data, true);
+    }
+
+    return new HoodieSparkRecord(getKey(), resultRow, newStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord updateValues(Schema recordSchema, Properties props, Map<String, String> metadataValues) throws IOException {
+    metadataValues.forEach((key, value) -> {
+      int pos = getStructType().fieldIndex(key);
+      if (value != null) {
+        data.update(pos, CatalystTypeConverters.convertToCatalyst(value));
+      }
+    });
+
+    return new HoodieSparkRecord(getKey(), data, getStructType(), getOperation());
+  }
+
+  @Override
+  public boolean isDelete(Schema schema, Properties props) throws IOException {
+    if (null == data) {
+      return true;
+    }
+    if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+      return false;
+    }
+    Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+    return deleteMarker instanceof Boolean && (boolean) deleteMarker;
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties props) throws IOException {
+    if (data != null && data.equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
+      Schema schema, Properties props,
+      Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation,
+      Option<String> partitionNameOp,
+      Boolean populateMetaFields) {
+    if (populateMetaFields) {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(getStructType(), data, withOperation);

Review Comment:
   Let's move this methods in this class and make them private to limit their access (they should also be deprecated/removed after we deprecate this method)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -505,7 +514,9 @@ public class HoodieWriteConfig extends HoodieConfig {
   private HoodieMetadataConfig metadataConfig;
   private HoodieMetastoreConfig metastoreConfig;
   private HoodieCommonConfig commonConfig;
+  private HoodieStorageConfig storageConfig;
   private EngineType engineType;
+  private HoodieRecordMerger recordMerger;

Review Comment:
   I don't think we need to hold `recordMerger` -- we should instantiate it on the fly



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and the values are the
+ *                string representation of the Avro schema, used to decrease the amount of data
+ *                that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] {

Review Comment:
   Let's not forget that we most importantly need this serializer to be registered w/ Spark:
   https://spark.incubator.apache.org/docs/0.6.0/tuning.html#data-serialization



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkStructTypeSerializer.scala:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.twitter.chill.KSerializer
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import org.apache.avro.SchemaNormalization
+import org.apache.commons.io.IOUtils
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, SparkException}
+import scala.collection.mutable
+
+/**
+ * Custom serializer used for generic spark records. If the user registers the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work needed to do.
+ * @param schemas a map where the keys are unique IDs for spark schemas and the values are the
+ *                string representation of the Avro schema, used to decrease the amount of data
+ *                that needs to be serialized.
+ */
+class SparkStructTypeSerializer(schemas: Map[Long, StructType]) extends KSerializer[HoodieSparkRecord] {

Review Comment:
   This is rather `HoodieSparkRecordSerializer`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {

Review Comment:
   @wzx140 i think we might have missed this comment



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -30,9 +34,19 @@
  * It can implement the merging logic of HoodieRecord of different engines
  * and avoid the performance consumption caused by the serialization/deserialization of Avro payload.
  */
-public interface HoodieMerge extends Serializable {
-  
-  HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer);
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieRecordMerger extends Serializable {
+
+  /**
+   * This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
+   * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
+   * of the single record, both orders of operations applications have to yield the same result)
+   */
+  Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;

Review Comment:
   @vinothchandar we've touched upon this w/ @prasannarajaperumal recently:
   
    - Initial take is that we're planning to have this method involved only when 2 records are merged (deletion is sub-type of merge, where second record is sentinel). Insertions will bypass this method
    - Your concern regarding users who have custom logic in `getInsertValue` is valid, but we don't want to overload the API out the gate and want actually to start w/ a simple API and increase complexity as we get more signals in terms of the other ways people are using (if they do)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org