You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/10/27 01:45:27 UTC

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3330: [HUDI-2101][RFC-28]support z-order for hudi

xiarixiaoyao commented on a change in pull request #3330:
URL: https://github.com/apache/hudi/pull/3330#discussion_r737040967



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableSchema;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Zoptimize$;
+
+/**
+ * A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition.
+ * support z-curve optimization, hilbert will come soon.
+ * @param <T> HoodieRecordPayload type
+ */
+public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPayload>
+    implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
+  private final HoodieSparkEngineContext sparkEngineContext;
+  private final SerializableSchema serializableSchema;
+  private final HoodieWriteConfig config;
+
+  public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext sparkEngineContext, HoodieWriteConfig config, Schema schema) {
+    this.sparkEngineContext = sparkEngineContext;
+    this.config = config;
+    this.serializableSchema = new SerializableSchema(schema);
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
+    String payloadClass = config.getPayloadClass();
+    // do sort
+    JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get());
+    return preparedRecord.map(record -> {
+      String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+      HoodieKey hoodieKey = new HoodieKey(key, partition);
+      HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass,
+          new Object[] {Option.of(record)}, Option.class);
+      HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
+      return hoodieRecord;
+    });
+  }
+
+  private JavaRDD<GenericRecord> prepareGenericRecord(JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final Schema schema) {
+    SerializableSchema serializableSchema = new SerializableSchema(schema);
+    JavaRDD<GenericRecord> genericRecordJavaRDD =  inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
+    Dataset<Row> originDF =  AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession());
+    Dataset<Row> zDataFrame;
+
+    switch (config.getOptimizeBuildCurveMethod()) {
+      case DIRECT:
+        zDataFrame = Zoptimize$.MODULE$.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups);

Review comment:
       @YannByron  thanks for you review。 sorry for that,yes directly DIRECT to createZIndexedDataFrameByMapValue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org