You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/16 09:55:00 UTC

[GitHub] [hudi] YuweiXiao commented on a diff in pull request #5416: [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency

YuweiXiao commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r876927787


##########
hudi-common/pom.xml:
##########
@@ -271,5 +271,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>3.4.2</version>

Review Comment:
   Is it better to define a version variable in project main pom? And put dependency declaration to main pom, too



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession}
+
+import scala.util.Random
+
+object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
+
+  protected val spark: SparkSession = getSparkSession
+
+  val recordNumber = 1000000
+
+  def getSparkSession: SparkSession = SparkSession.builder()
+    .master("local[*]")
+    .appName(this.getClass.getCanonicalName)
+    .withExtensions(new HoodieSparkSessionExtension)
+    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    .config("spark.sql.session.timeZone", "CTT")
+    .config(sparkConf())
+    .getOrCreate()
+
+  def sparkConf(): SparkConf = {
+    val sparkConf = new SparkConf()
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      sparkConf.set("spark.sql.catalog.spark_catalog",
+        "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+    }
+    sparkConf
+  }
+
+  private def createDataFrame(number: Int): DataFrame = {
+    val schema = new StructType()
+      .add("c1", IntegerType)
+      .add("c2", StringType)
+
+    val rdd = spark.sparkContext.parallelize(0 to number, 2).map { item =>
+      val c1 = Integer.valueOf(item)
+      val c2 = s"abc"
+      RowFactory.create(c1, c2)
+    }
+    spark.createDataFrame(rdd, schema)
+  }
+
+  /**
+   * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
+   * COW Ingestion:                            Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * ------------------------------------------------------------------------------------------------------------------------
+   * BoundInMemory Executor                             5557           5607          70          0.2        5556.9       1.0X
+   * Disruptor Executor                                 2758           2778          28          0.4        2757.7       2.0X
+   */
+  private def cowTableDisruptorExecutorBenchmark(tableName: String = "executorBenchmark"): Unit = {
+    val df = createDataFrame(recordNumber)

Review Comment:
   Could we put dataframe construction into the benchmark function, to avoid potential lazy construction of data frame and sharing of cached result, though benchmark results will also include the costs of source data construction.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import org.apache.hudi.exception.HoodieException;
+
+public class WaitStrategyFactory {
+
+  public static final String DEFAULT_STRATEGY = "BlockingWaitStrategy";
+
+  /**
+   * Build WaitStrategy for disruptor
+   */
+  public static WaitStrategy build(String name) {
+    WaitStrategy waitStrategy = null;
+    if ("BlockingWaitStrategy".equals(name)) {

Review Comment:
   Could we make these wait strategy as enum?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.dsl.Disruptor;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DisruptorExecutor<I, O, E> extends HoodieExecutor<I, O, E> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class);
+
+  // Executor service used for launching write thread.
+  private final ExecutorService producerExecutorService;
+  // Executor service used for launching read thread.
+  private final ExecutorService consumerExecutorService;
+  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
+  private final DisruptorMessageQueue<I, O> queue;
+  // Producers
+  private final List<DisruptorBasedProducer<I>> producers;
+  // Consumer
+  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
+  // pre-execute function to implement environment specific behavior before executors (producers/consumer) run
+  private final Runnable preExecuteRunnable;
+
+  public DisruptorExecutor(final int bufferSize, final Iterator<I> inputItr,
+                           BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedDisruptorProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, DisruptorBasedProducer<I> producer,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, WaitStrategyFactory.DEFAULT_STRATEGY, Functions.noop());
+  }
+
+  public DisruptorExecutor(final int bufferSize, DisruptorBasedProducer<I> producer,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, List<DisruptorBasedProducer<I>> producers,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
+                           final String waitStrategy, Runnable preExecuteRunnable) {
+    this.producers = producers;
+    this.consumer = consumer;
+    this.preExecuteRunnable = preExecuteRunnable;
+    // Ensure fixed thread for each producer thread
+    this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer"));
+    // Ensure single thread for consumer
+    this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer"));
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public ExecutorCompletionService<Boolean> startProducers() {
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<Boolean>(producerExecutorService);
+    producers.stream().map(producer -> {
+      return completionService.submit(() -> {
+        try {
+          preExecuteRunnable.run();
+
+          DisruptorPublisher publisher = new DisruptorPublisher<>(producer, queue);
+          publisher.startProduce();
+
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw e;
+        }
+        return true;
+      });
+    }).collect(Collectors.toList());
+    return completionService;
+  }
+
+  @Override
+  public E execute() {

Review Comment:
   Could we unify the execute method implementation to base class for DisruptorExecutor & original bounded Executor? e.g., purely rely on consumer finish states.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java:
##########
@@ -77,24 +81,43 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
   @Override
   protected List<WriteStatus> computeNext() {
     // Executor service used for launching writer thread.
-    BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
-        null;
+    HoodieExecutor<?, ?, List<WriteStatus>> executor = null;
     try {
       Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
       if (useWriterSchema) {
         schema = HoodieAvroUtils.addMetadataFields(schema);
       }
-      bufferedIteratorExecutor =
-          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
+
+      String executorType = hoodieConfig.getExecutorType();

Review Comment:
   We cloud put executorType -> executorTypeEnum conversion logic into the hooideConfig.getExecutorType(), to have more clear logic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org