You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/10 22:15:39 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5416: [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency

alexeykudinkin commented on code in PR #5416:
URL: https://github.com/apache/hudi/pull/5416#discussion_r991641365


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2274,6 +2306,11 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
       return this;
     }
 
+    public Builder withExecutorName(String executorClass) {

Review Comment:
   Should this be `withExecutorType`?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.DisruptorMessageHandler;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.DisruptorPublisher;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        e.printStackTrace();

Review Comment:
   Don't swallow exceptions, re-throw instead



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.DisruptorMessageHandler;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.DisruptorPublisher;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(16);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record)
+                  .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+    };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(100, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+
+      assertEquals(beforeRecord, afterRecord);
+      assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Test to ensure that we are reading all records from queue iterator when we have multiple producers.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testCompositeProducerRecordReading() throws Exception {
+    final int numRecords = 1000;
+    final int numProducers = 40;
+    final List<List<HoodieRecord>> recs = new ArrayList<>();
+
+    final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new DisruptorMessageQueue(1024, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            "BLOCKING_WAIT", numProducers, new Runnable() {
+              @Override
+          public void run() {
+                // do nothing.
+              }
+            });
+
+    // Record Key to <Producer Index, Rec Index within a producer>
+    Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
+
+    for (int i = 0; i < numProducers; i++) {
+      List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords);
+      int j = 0;
+      for (HoodieRecord r : pRecs) {
+        assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
+        keyToProducerAndIndexMap.put(r.getRecordKey(), new Tuple2<>(i, j));
+        j++;
+      }
+      recs.add(pRecs);
+    }
+
+    List<DisruptorPublisher> disruptorPublishers = new ArrayList<>();
+    for (int i = 0; i < recs.size(); i++) {
+      final List<HoodieRecord> r = recs.get(i);
+      // Alternate between pull and push based iterators
+      if (i % 2 == 0) {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new IteratorBasedQueueProducer<>(r.iterator()), queue);
+        disruptorPublishers.add(publisher);
+      } else {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new FunctionBasedQueueProducer<>((buf) -> {
+          Iterator<HoodieRecord> itr = r.iterator();
+          while (itr.hasNext()) {
+            try {
+              buf.insertRecord(itr.next());
+            } catch (Exception e) {
+              throw new HoodieException(e);
+            }
+          }
+          return true;
+        }), queue);
+        disruptorPublishers.add(publisher);
+      }
+    }
+
+    // Used to ensure that consumer sees the records generated by a single producer in FIFO order
+    Map<Integer, Integer> lastSeenMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
+    Map<Integer, Integer> countMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
+
+
+    // setup consumer and start disruptor
+    DisruptorMessageHandler<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> handler =
+        new DisruptorMessageHandler<>(new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
+            // Read recs and ensure we have covered all producer recs.
+            final HoodieRecord rec = payload.record;
+            Tuple2<Integer, Integer> producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey());
+            Integer lastSeenPos = lastSeenMap.get(producerPos._1());
+            countMap.put(producerPos._1(), countMap.get(producerPos._1()) + 1);
+            lastSeenMap.put(producerPos._1(), lastSeenPos + 1);
+            // Ensure we are seeing the next record generated
+            assertEquals(lastSeenPos + 1, producerPos._2().intValue());

Review Comment:
   Can we instead of asserting in a loop, collect all records per executor and then assert ordering in these lists?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -132,6 +134,14 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
           + "extract a key out of incoming records.");
 
+  public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
+      .key("hoodie.write.executor.type")
+      .defaultValue("BOUNDED_IN_MEMORY")

Review Comment:
   I believe @nsivabalan called this out previous, but don't see his comment anymore: let's create an enum with fixed values and use it here 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieDisruptorEventFactory.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventFactory;
+
+/**
+ * HoodieDisruptorEventFactory is used to create/preallocate HoodieDisruptorEvent.
+ *
+ */
+public class HoodieDisruptorEventFactory<O> implements EventFactory<HoodieDisruptorEvent<O>> {

Review Comment:
   Same comment as above (let's make it impl detail of DisruptorQueue)



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieDaemonThreadFactory.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Access to a ThreadFactory instance.
+ * 1. All threads are created with setDaemon(true).
+ * 2. All threads execute preExecuteRunnable func once.
+ */
+public class HoodieDaemonThreadFactory implements ThreadFactory {
+
+  private final Runnable preExecuteRunnable;
+  private final AtomicInteger threadsNum = new AtomicInteger();
+  private final String namePattern;
+  private final String baseName = "Hoodie-disruptor-daemon-thread";

Review Comment:
   Let's make template configurable (since ThreadFactory isn't coupled to Disruptor in any way)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1040,6 +1064,14 @@ public int getWriteBufferLimitBytes() {
     return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
   }
 
+  public String getWriteWaitStrategy() {

Review Comment:
    - nit: `getWriteExecutorWaitStrategy`
    - Let's make it return `Option` (since config is optional)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/MessageQueueFactory.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.ExecutorType;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+public class MessageQueueFactory {

Review Comment:
   nit: Name is misleading here, let's ref executor in the name (like for ex `QueueBasedExecutorFactory`)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java:
##########
@@ -68,25 +68,26 @@ private Runnable getPreExecuteRunnable() {
     return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
   }
 
+  // common produce and consume based on disruptor executor

Review Comment:
   Do we need this comment?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1040,6 +1064,14 @@ public int getWriteBufferLimitBytes() {
     return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
   }
 
+  public String getWriteWaitStrategy() {
+    return getString(WRITE_WAIT_STRATEGY);
+  }
+
+  public int getWriteBufferSize() {

Review Comment:
   Similar comment to above one (this one will actually cause NPE, since we're returning primitive type)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;

Review Comment:
   Let's actually instead collect all records and assert that consumed records are identical to produced ones (we want to assert there's no tampering, and that the ordering is preserved)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records

Review Comment:
   Comment is out of date



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {

Review Comment:
   There's no need to override in that case (just adds boilerplate)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  public void testInterruptExecutor() {
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ExecutorService pool = Executors.newSingleThreadExecutor();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(1024);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            try {
+              while (true) {
+                Thread.sleep(1000);
+              }
+            } catch (InterruptedException ie) {
+              return;
+            }
+          }
+
+          @Override
+          public void finish() {

Review Comment:
   Same comment as above



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  public void testInterruptExecutor() {
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ExecutorService pool = Executors.newSingleThreadExecutor();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(1024);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            try {
+              while (true) {
+                Thread.sleep(1000);
+              }
+            } catch (InterruptedException ie) {
+              return;
+            }
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return 0;
+          }
+        };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
+    AtomicReference<Exception> actualException = new AtomicReference<>();
+    try {
+      executor = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;

Review Comment:
   No need for another var, just drop `= null`



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.DisruptorMessageHandler;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.DisruptorPublisher;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(16);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record)
+                  .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+              e.printStackTrace();

Review Comment:
   Same comment as above, please ditto across the board



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  public void testInterruptExecutor() {
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ExecutorService pool = Executors.newSingleThreadExecutor();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(1024);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            try {
+              while (true) {
+                Thread.sleep(1000);

Review Comment:
   Please avoid using sleeps in tests (it's a time-bomb). Instead just park the thread on a lock/latch/etc



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.DisruptorMessageHandler;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.DisruptorPublisher;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(16);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record)
+                  .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+    };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(100, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+
+      assertEquals(beforeRecord, afterRecord);
+      assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Test to ensure that we are reading all records from queue iterator when we have multiple producers.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testCompositeProducerRecordReading() throws Exception {
+    final int numRecords = 1000;
+    final int numProducers = 40;
+    final List<List<HoodieRecord>> recs = new ArrayList<>();
+
+    final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new DisruptorMessageQueue(1024, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            "BLOCKING_WAIT", numProducers, new Runnable() {
+              @Override
+          public void run() {
+                // do nothing.
+              }
+            });
+
+    // Record Key to <Producer Index, Rec Index within a producer>
+    Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
+
+    for (int i = 0; i < numProducers; i++) {
+      List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords);
+      int j = 0;
+      for (HoodieRecord r : pRecs) {
+        assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
+        keyToProducerAndIndexMap.put(r.getRecordKey(), new Tuple2<>(i, j));
+        j++;
+      }
+      recs.add(pRecs);
+    }
+
+    List<DisruptorPublisher> disruptorPublishers = new ArrayList<>();
+    for (int i = 0; i < recs.size(); i++) {
+      final List<HoodieRecord> r = recs.get(i);
+      // Alternate between pull and push based iterators
+      if (i % 2 == 0) {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new IteratorBasedQueueProducer<>(r.iterator()), queue);
+        disruptorPublishers.add(publisher);
+      } else {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new FunctionBasedQueueProducer<>((buf) -> {
+          Iterator<HoodieRecord> itr = r.iterator();
+          while (itr.hasNext()) {
+            try {
+              buf.insertRecord(itr.next());
+            } catch (Exception e) {
+              throw new HoodieException(e);
+            }
+          }
+          return true;
+        }), queue);
+        disruptorPublishers.add(publisher);
+      }
+    }
+
+    // Used to ensure that consumer sees the records generated by a single producer in FIFO order
+    Map<Integer, Integer> lastSeenMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
+    Map<Integer, Integer> countMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
+
+
+    // setup consumer and start disruptor
+    DisruptorMessageHandler<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> handler =
+        new DisruptorMessageHandler<>(new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
+            // Read recs and ensure we have covered all producer recs.
+            final HoodieRecord rec = payload.record;
+            Tuple2<Integer, Integer> producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey());

Review Comment:
   Let's use Hudi's Pair instead of Scala's Tuples



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128);
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(8);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+        };
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  @Test
+  public void testInterruptExecutor() {
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ExecutorService pool = Executors.newSingleThreadExecutor();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(1024);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            try {
+              while (true) {
+                Thread.sleep(1000);
+              }
+            } catch (InterruptedException ie) {
+              return;
+            }
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return 0;
+          }
+        };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
+    AtomicReference<Exception> actualException = new AtomicReference<>();
+    try {
+      executor = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
+
+      Future<?> future = pool.submit(() -> {
+        try {
+          finalExecutor.execute();
+        } catch (Exception e) {
+          actualException.set(e);
+        }
+
+      });
+      future.cancel(true);
+      future.get();
+      assertTrue(actualException.get() instanceof HoodieException);

Review Comment:
   Instead of that please use `assertThrows` utility



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java:
##########
@@ -46,68 +43,52 @@
  * class takes as input the size limit, queue producer(s), consumer and transformer and exposes API to orchestrate
  * concurrent execution of these actors communicating through a central bounded queue
  */
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> implements HoodieExecutor<I, O, E> {
 
   private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
-  private static final long TERMINATE_WAITING_TIME_SECS = 60L;
-  // Executor service used for launching write thread.
-  private final ExecutorService producerExecutorService;
-  // Executor service used for launching read thread.
-  private final ExecutorService consumerExecutorService;
   // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
   private final BoundedInMemoryQueue<I, O> queue;
-  // Producers
-  private final List<BoundedInMemoryQueueProducer<I>> producers;
-  // Consumer
-  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
-  // pre-execute function to implement environment specific behavior before executors (producers/consumer) run
-  private final Runnable preExecuteRunnable;
+  private final HoodieExecutorBase<I, O, E> hoodieExecutorBase;

Review Comment:
   Why aggregating and not extending?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DisruptorExecutor<I, O, E> implements HoodieExecutor<I, O, E> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class);
+
+  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
+  private final DisruptorMessageQueue<I, O> queue;
+  private final HoodieExecutorBase<I, O, E> hoodieExecutorBase;
+
+  public DisruptorExecutor(final int bufferSize, final Iterator<I> inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, WaitStrategyFactory.DEFAULT_STRATEGY, Functions.noop());
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
+                           final String waitStrategy, Runnable preExecuteRunnable) {
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable);
+    this.hoodieExecutorBase = new HoodieExecutorBase<>(producers, consumer, preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public ExecutorCompletionService<Boolean> startProducers() {
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<Boolean>(hoodieExecutorBase.getProducerExecutorService());
+    hoodieExecutorBase.getProducers().stream().map(producer -> {
+      return completionService.submit(() -> {
+        try {
+          hoodieExecutorBase.getPreExecuteRunnable().run();

Review Comment:
   Let's embed this invocation into the executor itself (by creating a wrapper, so that there's no need to invoke it every time)



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DisruptorExecutor<I, O, E> implements HoodieExecutor<I, O, E> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class);
+
+  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
+  private final DisruptorMessageQueue<I, O> queue;
+  private final HoodieExecutorBase<I, O, E> hoodieExecutorBase;
+
+  public DisruptorExecutor(final int bufferSize, final Iterator<I> inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, WaitStrategyFactory.DEFAULT_STRATEGY, Functions.noop());
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
+                           final String waitStrategy, Runnable preExecuteRunnable) {
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable);
+    this.hoodieExecutorBase = new HoodieExecutorBase<>(producers, consumer, preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public ExecutorCompletionService<Boolean> startProducers() {
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<Boolean>(hoodieExecutorBase.getProducerExecutorService());
+    hoodieExecutorBase.getProducers().stream().map(producer -> {
+      return completionService.submit(() -> {
+        try {
+          hoodieExecutorBase.getPreExecuteRunnable().run();
+
+          DisruptorPublisher publisher = new DisruptorPublisher<>(producer, queue);
+          publisher.startProduce();
+
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw new HoodieException("Error producing records in disruptor executor", e);
+        }
+        return true;
+      });
+    }).collect(Collectors.toList());

Review Comment:
   There's no need to do collect, let's do `forEach` instead



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.DisruptorMessageHandler;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.DisruptorPublisher;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+  private final String instantTime = HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() throws Exception {
+
+    final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    when(hoodieWriteConfig.getWriteBufferSize()).thenReturn(16);
+    IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer =
+        new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          private int count = 0;
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record)
+                  .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return count;
+          }
+    };
+
+    DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new DisruptorExecutor(hoodieWriteConfig.getWriteBufferSize(), hoodieRecords.iterator(), consumer,
+          getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(100, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRemaining());
+
+      assertEquals(beforeRecord, afterRecord);
+      assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Test to ensure that we are reading all records from queue iterator when we have multiple producers.
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testCompositeProducerRecordReading() throws Exception {
+    final int numRecords = 1000;
+    final int numProducers = 40;
+    final List<List<HoodieRecord>> recs = new ArrayList<>();
+
+    final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+        new DisruptorMessageQueue(1024, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+            "BLOCKING_WAIT", numProducers, new Runnable() {
+              @Override
+          public void run() {
+                // do nothing.
+              }
+            });
+
+    // Record Key to <Producer Index, Rec Index within a producer>
+    Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
+
+    for (int i = 0; i < numProducers; i++) {
+      List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords);
+      int j = 0;
+      for (HoodieRecord r : pRecs) {
+        assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
+        keyToProducerAndIndexMap.put(r.getRecordKey(), new Tuple2<>(i, j));
+        j++;
+      }
+      recs.add(pRecs);
+    }
+
+    List<DisruptorPublisher> disruptorPublishers = new ArrayList<>();
+    for (int i = 0; i < recs.size(); i++) {
+      final List<HoodieRecord> r = recs.get(i);
+      // Alternate between pull and push based iterators
+      if (i % 2 == 0) {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new IteratorBasedQueueProducer<>(r.iterator()), queue);
+        disruptorPublishers.add(publisher);
+      } else {
+        DisruptorPublisher publisher = new DisruptorPublisher<>(new FunctionBasedQueueProducer<>((buf) -> {
+          Iterator<HoodieRecord> itr = r.iterator();
+          while (itr.hasNext()) {
+            try {
+              buf.insertRecord(itr.next());
+            } catch (Exception e) {
+              throw new HoodieException(e);
+            }
+          }
+          return true;
+        }), queue);
+        disruptorPublishers.add(publisher);
+      }
+    }
+
+    // Used to ensure that consumer sees the records generated by a single producer in FIFO order
+    Map<Integer, Integer> lastSeenMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
+    Map<Integer, Integer> countMap =
+        IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
+
+
+    // setup consumer and start disruptor
+    DisruptorMessageHandler<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> handler =
+        new DisruptorMessageHandler<>(new IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() {
+
+          @Override
+          public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) {
+            // Read recs and ensure we have covered all producer recs.
+            final HoodieRecord rec = payload.record;
+            Tuple2<Integer, Integer> producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey());
+            Integer lastSeenPos = lastSeenMap.get(producerPos._1());
+            countMap.put(producerPos._1(), countMap.get(producerPos._1()) + 1);
+            lastSeenMap.put(producerPos._1(), lastSeenPos + 1);
+            // Ensure we are seeing the next record generated
+            assertEquals(lastSeenPos + 1, producerPos._2().intValue());
+          }
+
+          @Override
+          public void finish() {
+          }
+
+          @Override
+          protected Integer getResult() {
+            return 0;
+          }
+        });
+
+
+    queue.setHandlers(handler);
+    queue.start();
+
+
+    // start to produce records
+    final List<Future<Boolean>> futureList = disruptorPublishers.stream().map(disruptorPublisher -> {
+      return executorService.submit(() -> {
+        disruptorPublisher.startProduce();
+        return true;
+      });
+    }).collect(Collectors.toList());
+
+    // Close queue
+    Future<Boolean> closeFuture = executorService.submit(() -> {

Review Comment:
   nit: Instead we can use `ForkJoinPool` and join `CompletableFuture`s



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageHandler.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventHandler;
+
+public class DisruptorMessageHandler<O, E> implements EventHandler<HoodieDisruptorEvent<O>> {
+
+  private IteratorBasedQueueConsumer<O, E> consumer;
+
+  public DisruptorMessageHandler(IteratorBasedQueueConsumer<O, E> consumer) {
+    this.consumer = consumer;
+  }
+
+  @Override
+  public void onEvent(HoodieDisruptorEvent<O> event, long sequence, boolean endOfBatch) {
+    consumer.consumeOneRecord(event.get());
+    event.clear();

Review Comment:
   Managing state of the Ring should be done w/in `DisruptorQueue` itself



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DisruptorExecutor<I, O, E> implements HoodieExecutor<I, O, E> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class);
+
+  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
+  private final DisruptorMessageQueue<I, O> queue;
+  private final HoodieExecutorBase<I, O, E> hoodieExecutorBase;
+
+  public DisruptorExecutor(final int bufferSize, final Iterator<I> inputItr,
+                           IteratorBasedQueueConsumer<O, E> consumer, Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, WaitStrategyFactory.DEFAULT_STRATEGY, Functions.noop());
+  }
+
+  public DisruptorExecutor(final int bufferSize, HoodieProducer<I> producer,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, List<HoodieProducer<I>> producers,
+                           Option<IteratorBasedQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
+                           final String waitStrategy, Runnable preExecuteRunnable) {
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable);
+    this.hoodieExecutorBase = new HoodieExecutorBase<>(producers, consumer, preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public ExecutorCompletionService<Boolean> startProducers() {
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<Boolean>(hoodieExecutorBase.getProducerExecutorService());
+    hoodieExecutorBase.getProducers().stream().map(producer -> {
+      return completionService.submit(() -> {
+        try {
+          hoodieExecutorBase.getPreExecuteRunnable().run();
+
+          DisruptorPublisher publisher = new DisruptorPublisher<>(producer, queue);
+          publisher.startProduce();
+
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw new HoodieException("Error producing records in disruptor executor", e);
+        }
+        return true;
+      });
+    }).collect(Collectors.toList());
+    return completionService;
+  }
+
+  @Override
+  public E execute() {
+    try {
+      ValidationUtils.checkState(hoodieExecutorBase.getConsumer().isPresent());
+      startConsumer();
+      ExecutorCompletionService<Boolean> pool = startProducers();
+      return finishConsuming(pool);
+    } catch (InterruptedException ie) {
+      shutdownNow();
+      Thread.currentThread().interrupt();
+      throw new HoodieException(ie);
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  @Override
+  public E finishConsuming(Object o) throws ExecutionException, InterruptedException {

Review Comment:
   A few issue with this API: a) It's not type-safe (which we need to address), b) we don't need to pass in anything into it, instead this API simply 
   
    - Should be called after all producers/consumers have finished
    - Should just be responsible for closing/cleaning up the executor's resources



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.function.Function;
+
+public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class);
+
+  private final Disruptor<HoodieDisruptorEvent<O>> queue;
+  private final Function<I, O> transformFunction;
+  private final RingBuffer<HoodieDisruptorEvent<O>> ringBuffer;
+
+  public DisruptorMessageQueue(int bufferSize, Function<I, O> transformFunction, String waitStrategyName, int totalProducers, Runnable preExecuteRunnable) {
+    WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
+    HoodieDaemonThreadFactory threadFactory = new HoodieDaemonThreadFactory(preExecuteRunnable);
+
+    this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize, threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy);
+    this.ringBuffer = queue.getRingBuffer();
+    this.transformFunction = transformFunction;
+  }
+
+  @Override
+  public long size() {
+    return queue.getBufferSize();
+  }
+
+  @Override
+  public void insertRecord(I value) throws Exception {
+    O applied = transformFunction.apply(value);
+
+    EventTranslator<HoodieDisruptorEvent<O>> translator = new EventTranslator<HoodieDisruptorEvent<O>>() {
+      @Override
+      public void translateTo(HoodieDisruptorEvent<O> event, long sequence) {
+        event.set(applied);
+      }
+    };
+
+    queue.getRingBuffer().publishEvent(translator);
+  }
+
+  @Override
+  public Option<O> readNextRecord() {
+    // Let DisruptorMessageHandler to handle consuming logic.
+    return null;
+  }
+
+  @Override
+  public void close() {
+    queue.shutdown();
+  }
+
+  public void waitForConsumingFinished() throws InterruptedException {
+    while (!isEmpty()) {
+      Thread.sleep(1000);

Review Comment:
   We should not be doing busy-waiting (sleeping) in production code, instead we'd be waiting on a Future, until ones completes (there are `CompletableFuture`s specifically for that)



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageHandler.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventHandler;
+
+public class DisruptorMessageHandler<O, E> implements EventHandler<HoodieDisruptorEvent<O>> {

Review Comment:
   Why do we need Disruptor-specific handler?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorPublisher.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+public class DisruptorPublisher<I, O> {

Review Comment:
   Same comment as above: why do we need Disruptor specific publisher?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.dsl.Disruptor;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DisruptorExecutor<I, O, E> extends HoodieExecutor<I, O, E> {
+
+  private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class);
+
+  // Executor service used for launching write thread.
+  private final ExecutorService producerExecutorService;
+  // Executor service used for launching read thread.
+  private final ExecutorService consumerExecutorService;
+  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
+  private final DisruptorMessageQueue<I, O> queue;
+  // Producers
+  private final List<DisruptorBasedProducer<I>> producers;
+  // Consumer
+  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
+  // pre-execute function to implement environment specific behavior before executors (producers/consumer) run
+  private final Runnable preExecuteRunnable;
+
+  public DisruptorExecutor(final int bufferSize, final Iterator<I> inputItr,
+                           BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, new IteratorBasedDisruptorProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, DisruptorBasedProducer<I> producer,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
+    this(bufferSize, producer, consumer, transformFunction, WaitStrategyFactory.DEFAULT_STRATEGY, Functions.noop());
+  }
+
+  public DisruptorExecutor(final int bufferSize, DisruptorBasedProducer<I> producer,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, String waitStrategy, Runnable preExecuteRunnable) {
+    this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable);
+  }
+
+  public DisruptorExecutor(final int bufferSize, List<DisruptorBasedProducer<I>> producers,
+                           Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
+                           final String waitStrategy, Runnable preExecuteRunnable) {
+    this.producers = producers;
+    this.consumer = consumer;
+    this.preExecuteRunnable = preExecuteRunnable;
+    // Ensure fixed thread for each producer thread
+    this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer"));
+    // Ensure single thread for consumer
+    this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer"));
+    this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable);
+  }
+
+  /**
+   * Start all Producers.
+   */
+  public ExecutorCompletionService<Boolean> startProducers() {
+    final ExecutorCompletionService<Boolean> completionService =
+        new ExecutorCompletionService<Boolean>(producerExecutorService);
+    producers.stream().map(producer -> {
+      return completionService.submit(() -> {
+        try {
+          preExecuteRunnable.run();
+
+          DisruptorPublisher publisher = new DisruptorPublisher<>(producer, queue);
+          publisher.startProduce();
+
+        } catch (Throwable e) {
+          LOG.error("error producing records", e);
+          throw e;
+        }
+        return true;
+      });
+    }).collect(Collectors.toList());
+    return completionService;
+  }
+
+  @Override
+  public E execute() {

Review Comment:
   @zhangyue19921010 let's make sure both BIMQ and Disruptor share `execute` method



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieDisruptorEvent.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+public class HoodieDisruptorEvent<O> {

Review Comment:
   Alright, i see what this is now.
   Let's make this a private subclass of the DisruptorQueue -- this is it's internal impl detail, and there's no reason to expose it outside of it
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org