You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "xushiyan (via GitHub)" <gi...@apache.org> on 2023/04/06 00:03:38 UTC

[GitHub] [hudi] xushiyan opened a new pull request, #8390: [HUDI-5315] Use sample writes to estimate record size

xushiyan opened a new pull request, #8390:
URL: https://github.com/apache/hudi/pull/8390

   ### Change Logs
   
   For first commit to have a more accurate record size estimate, we can configure the writer to sample and write some records to `.hoodie/.aux/.sample_writes/` and then read the avg record size for the actual write commit.
   
   ### Impact
   
   First commit that turns on the feature will be slower.
   
   ### Risk level
   
   Low.
   
   ### Documentation Update
   
   - [ ] Update docs about the new configs and usage
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1534075473

   > lets move the sample writes call as early as possible. so we construct the writeConfig w/ the avg record size over-ridden if need be. we don't want to mutate the write config.
   
   This would make write client creation dependent on input data. It's possible to do it with deltastreamer, where client is created after fetch records from source, but it's not possible and seemingly anti-pattern to do it for data source writer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182577420


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));

Review Comment:
   i don't see a big issue with overwriting this config at this point, as we're actually fixing a value which wasn't set properly. Not sure how exactly to embed it with upsertpartitioner, but i think this is a clean separation point to intercept and sample records. besides, people may have set a custom partitioner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184620783


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);

Review Comment:
   so, wouldn't this OOM if we try to bring all records to just 1 spark partition? or since we do take(5k), spark is efficient and will not bring all records to 1 spark partition, but just 5k records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan merged pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan merged PR #8390:
URL: https://github.com/apache/hudi/pull/8390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1160286007


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -656,6 +657,19 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to enable commit conflict checking or not during early "
           + "conflict detection.");
 
+  public static final ConfigProperty<Boolean> SAMPLE_WRITES_ENABLED = ConfigProperty
+      .key("hoodie.write.sample.writes.enabled")
+      .defaultValue(false)
+      .withDocumentation("Set this to true to sample from the first batch of records and write to the auxiliary path, before writing to the table."
+          + "The sampled records are used to calculate the average record size. The relevant write client will have `" + COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key()
+          + "` being overwritten by the calculated result.");
+
+  public static final ConfigProperty<Integer> SAMPLE_WRITES_SIZE = ConfigProperty
+      .key("hoodie.write.sample.writes.size")
+      .defaultValue(2000)

Review Comment:
   we can make this 10k. just incase the meta fields addition overshadow original data size. not too strong on the suggestion though. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);

Review Comment:
   how do we guarantee that this will not trigger the dag twice? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);
+      sampleWriteClient.startCommitWithTime(instantTime);
+      JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
+      if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {

Review Comment:
   we can fetch the size from WriteStatus here itself right. why we need to let it succeed and then read the commit metadata explicitly? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }

Review Comment:
   should we add a finally block and clean up the entire table written. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1498348925

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7c262e5538512cc26ffc01fa06b2225af147e9b8 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1498316610

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7c262e5538512cc26ffc01fa06b2225af147e9b8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184665421


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {

Review Comment:
   https://issues.apache.org/jira/browse/HUDI-6169



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1162269179


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);

Review Comment:
   this performs full write (from source to target path) for the sample and hence it triggers the DAG, which is unavoidable. It only does for sampled data though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182808070


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {

Review Comment:
   lets create a follow up ticket. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182808377


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {

Review Comment:
   sg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1534270148

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16820",
       "triggerID" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9acf2307ac9cbaa2f96a7f48ec09d4be0899650 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791) 
   * 2d9751572907b186dff5aa5bf3c8dc9c2138d936 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16820) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182590639


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -656,6 +657,19 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to enable commit conflict checking or not during early "
           + "conflict detection.");
 
+  public static final ConfigProperty<Boolean> SAMPLE_WRITES_ENABLED = ConfigProperty
+      .key("hoodie.write.sample.writes.enabled")
+      .defaultValue(false)
+      .withDocumentation("Set this to true to sample from the first batch of records and write to the auxiliary path, before writing to the table."
+          + "The sampled records are used to calculate the average record size. The relevant write client will have `" + COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key()
+          + "` being overwritten by the calculated result.");
+
+  public static final ConfigProperty<Integer> SAMPLE_WRITES_SIZE = ConfigProperty
+      .key("hoodie.write.sample.writes.size")
+      .defaultValue(2000)

Review Comment:
   sounds fair to increase it given this is a 1-time write. to balance a bit, we can go with 5k, as for big payload size, more sampling won't make much difference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184523790


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);

Review Comment:
   this won't work as take() is an action that should go last



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1534259063

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9acf2307ac9cbaa2f96a7f48ec09d4be0899650 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791) 
   * 2d9751572907b186dff5aa5bf3c8dc9c2138d936 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1498587348

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7c262e5538512cc26ffc01fa06b2225af147e9b8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152) 
   * 46591e6e4177f0007e25432d4140a579beb2629e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1499108936

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46591e6e4177f0007e25432d4140a579beb2629e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1498461821

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7c262e5538512cc26ffc01fa06b2225af147e9b8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1177161536


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }

Review Comment:
   sg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1531636692

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46591e6e4177f0007e25432d4140a579beb2629e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166) 
   * b9acf2307ac9cbaa2f96a7f48ec09d4be0899650 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1531895797

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b9acf2307ac9cbaa2f96a7f48ec09d4be0899650 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182976992


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);

Review Comment:
   should we clean the directory before we try out. what happens if 1st attempt crashed while writing to SAMPLE_WRITES_FOLDER_PATH ? 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -367,6 +368,7 @@ object HoodieSparkSqlWriter {
                 hoodieRecords
               }
             client.startCommitWithTime(instantTime, commitActionType)
+            SparkSampleWritesUtils.overwriteRecordSizeEstimateIfNeeded(jsc, hoodieRecords, client.getConfig, instantTime)

Review Comment:
   synced up offline. we can keep it at higher layer. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);

Review Comment:
   shouldn't we do records.take(size) and then coalesce() ?
   ```
   records.take(size).coalesce(1)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184626993


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);

Review Comment:
   nvm i misread. this will be fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1498595646

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7c262e5538512cc26ffc01fa06b2225af147e9b8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152) 
   * 46591e6e4177f0007e25432d4140a579beb2629e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1162271514


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }

Review Comment:
   the design is to hook up the clean-up with rollback/insert overwrite lifecycle. this is the follow up https://issues.apache.org/jira/browse/HUDI-6044  the current impl has segregate different sample writes with commit and timestamp. the idea is to keep them for inspection purpose and clean up along with rollback/insert overwrite actions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1182594955


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {

Review Comment:
   this `isTimelineNonEmpty()` api filters on completed instants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184522344


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);

Review Comment:
   the base path also contains current timestamp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1162277169


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);
+      sampleWriteClient.startCommitWithTime(instantTime);
+      JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
+      if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {

Review Comment:
   the question is: if the sample write failed, can we still rely on and use the stats? it's a design choice. i prefer it to ensure the e2e write cycle finished properly. open to discuss different options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1184666864


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));

Review Comment:
   the PR is updated to keep writeconfig immutable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1534549058

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16791",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16820",
       "triggerID" : "2d9751572907b186dff5aa5bf3c8dc9c2138d936",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d9751572907b186dff5aa5bf3c8dc9c2138d936 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16820) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8390:
URL: https://github.com/apache/hudi/pull/8390#issuecomment-1531621562

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16152",
       "triggerID" : "7c262e5538512cc26ffc01fa06b2225af147e9b8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166",
       "triggerID" : "46591e6e4177f0007e25432d4140a579beb2629e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b9acf2307ac9cbaa2f96a7f48ec09d4be0899650",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 46591e6e4177f0007e25432d4140a579beb2629e Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16166) 
   * b9acf2307ac9cbaa2f96a7f48ec09d4be0899650 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8390: [HUDI-5315] Use sample writes to estimate record size

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8390:
URL: https://github.com/apache/hudi/pull/8390#discussion_r1177162896


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {

Review Comment:
   should we check for any completed commits? what incase there are only failed commits in the timeline. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table %s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    long now = Instant.now().toEpochMilli();
+    Path basePath = new CachingPath(writeConfig.getBasePath(), SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime + Path.SEPARATOR + now);
+    final String sampleWritesBasePath = basePath.toString();
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s_%s", writeConfig.getTableName(), instantTime, now))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(writeConfig.getProps())
+        .withPath(sampleWritesBasePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSampleWritesEnabled(false)
+        .withTableServicesEnabled(false)
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {

Review Comment:
   can we generalize this so that we can leverage this for flink as well ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -367,6 +368,7 @@ object HoodieSparkSqlWriter {
                 hoodieRecords
               }
             client.startCommitWithTime(instantTime, commitActionType)
+            SparkSampleWritesUtils.overwriteRecordSizeEstimateIfNeeded(jsc, hoodieRecords, client.getConfig, instantTime)

Review Comment:
   should we call this in UpsertPartitioner where we try to determine the avg record size? so that all writers can leverage. as of now, we explicitly call from both spark sql writer and deltastreamer. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSampleWritesUtils.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_ENABLED;
+import static org.apache.hudi.config.HoodieWriteConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading the commit metadata.
+ *
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static void overwriteRecordSizeEstimateIfNeeded(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return;
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is non-empty.");
+      return;
+    }
+    try {
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        writeConfig.setValue(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE, String.valueOf(avgSize));

Review Comment:
   generally our writeConfig is immutable. So, if we embed the call to do sample writes w/n UpsertPartitioner, we can avoid this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org