You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by "leixm (via GitHub)" <gi...@apache.org> on 2023/03/28 12:48:27 UTC

[GitHub] [incubator-uniffle] leixm opened a new pull request, #775: [#757] Separate flush thread pools for different storage type.

leixm opened a new pull request, #775:
URL: https://github.com/apache/incubator-uniffle/pull/775

   ### What changes were proposed in this pull request?
   Separate flush thread pools for different storage type.
   
   ### Why are the changes needed?
   Writing local files requires less concurrency, while writing hdfs requires more concurrency, it is best to separate thread pools.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes.
   
   ### How was this patch tested?
   existing UTs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1569429903

   @zuston  PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153129328


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")
       .intType()
       .defaultValue(10)
-      .withDescription("thread pool for flush data to file");
+      .withDescription("thread pool for flush data to local file");
+
+  public static final ConfigOption<Integer> SERVER_FLUSH_HDFS_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.hdfs.threadPool.size")
+      .intType()
+      .defaultValue(10)
+      .withDescription("thread pool for flush data to hdfs");
 
   public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions

Review Comment:
   Let's eliminate this pending queue and unify it with the event handler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1154343048


##########
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java:
##########
@@ -66,6 +66,7 @@ public void closeClient() {
 
   @Test
   public void fallbackTest() throws Exception {
+    REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";

Review Comment:
   > Change to local variable maybe better.
   
   Other method use also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491645528

   We should add this change to uniffle-migration-guide.md.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1259448828


##########
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java:
##########
@@ -107,7 +107,10 @@ public synchronized ShuffleDataFlushEvent toFlushEvent(
         spBlocks,
         isValid,
         this);
-    event.addCleanupCallback(() -> this.clearInFlushBuffer(event.getEventId()));
+    event.addCleanupCallback(() -> {
+      this.clearInFlushBuffer(event.getEventId());
+      spBlocks.forEach(spb -> spb.getData().release());

Review Comment:
   My mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1489837478

   > > It seems to be caused by concurrency issues, which cannot be reproduced in the development environment.
   > 
   > Maybe you can read the logs of ut. Do you know how to find the logs of ut?
   
   Yes, i know, already fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491753179

   > We should add this change to uniffle-migration-guide.md.
   
   Ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1559540346

   @zuston Could you help this pr again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1151345101


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -145,7 +160,14 @@ protected void eventLoop() {
   protected void processNextEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   I think that the storageType should not be switched in the flushToFile method. When the writing fails or the storage is )
   corrupted, the event should be put into pendingEvents, and flushToFile should be performed again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types
URL: https://github.com/apache/incubator-uniffle/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491213799

   @zuston  @smallzhongfeng  PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153925661


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;

Review Comment:
   Not yet. OK. let's reserve current implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491839932

   The design looks not clear, the dependencies are complex. I think it should refactor like following steps
   1. Introducing the eventHandler like this PR does and using a queue in here. But I hope it should do more, including pushing events directly to this queue rather than using the `processEventThread`
   2. Remove the pending queue or unify to eventHandler. If using the latter, the multiple priority queues should be introduced in eventHandler
   
   Plz let me know If I'm wrong. 
   
   cc @jerqi I remember you want to remove the pending queue. How about doing this in this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1560349513

   > @zuston Could you help this pr again?
   
   Yes, let me take a look in next few days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1152864998


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+
+  public StorageTypeFlushEventHandler(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager,
+      StorageManager storageManager) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.shuffleFlushManager = shuffleFlushManager;
+    this.storageManager = storageManager;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    Storage storage = storageManager.selectStorage(event);
+    if (storage instanceof HdfsStorage) {
+      hdfsThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, false));
+    } else if (storage instanceof LocalStorage) {
+      localFileThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, true));
+    } else {
+      throw new RssException("Unexpected storage type!");
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      shuffleFlushManager.processEvent(event);
+    } finally {

Review Comment:
   No need.



##########
server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java:
##########
@@ -587,6 +587,7 @@ public void shuffleFlushThreshold() throws Exception {
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31));
     assertEquals(96 + 63, shuffleBufferManager.getUsedMemory());
     shuffleFlushManager.flush();
+    Thread.sleep(100);

Review Comment:
   server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java#createFlushEventExecutor  method make here sync, so we do not sleep here before.



##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+
+  public StorageTypeFlushEventHandler(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager,
+      StorageManager storageManager) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.shuffleFlushManager = shuffleFlushManager;
+    this.storageManager = storageManager;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    Storage storage = storageManager.selectStorage(event);
+    if (storage instanceof HdfsStorage) {
+      hdfsThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, false));
+    } else if (storage instanceof LocalStorage) {
+      localFileThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, true));
+    } else {
+      throw new RssException("Unexpected storage type!");
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      shuffleFlushManager.processEvent(event);
+    } finally {

Review Comment:
   ShuffleFlushManager#processEvent already handle exception.



##########
server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java:
##########
@@ -580,4 +580,58 @@ public void processPendingEventsTest(@TempDir File tempDir) throws Exception {
     assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
     assertEquals(0, manager.getPendingEventsSize());
   }
+
+  @Test
+  public void storageTypeFlushEventHandlerTest(@TempDir File tempDir) throws Exception {
+    shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.toString());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+    shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
+    shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_HDFS_THREAD_POOL_SIZE, 1);

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153137043


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+
+  public StorageTypeFlushEventHandler(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager,
+      StorageManager storageManager) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.shuffleFlushManager = shuffleFlushManager;
+    this.storageManager = storageManager;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    Storage storage = storageManager.selectStorage(event);
+    if (storage instanceof HdfsStorage) {
+      hdfsThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, false));
+    } else if (storage instanceof LocalStorage) {
+      localFileThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, true));
+    } else {
+      throw new RssException("Unexpected storage type!");
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      shuffleFlushManager.processEvent(event);
+    } finally {
+      if (isLocalFile) {
+        ShuffleServerMetrics.counterLocalFileEventFlush.inc();
+      } else {
+        ShuffleServerMetrics.counterHdfsEventFlush.inc();
+      }
+    }
+  }
+
+  protected void initFlushEventExecutor() {
+    if (StorageType.withLocalfile(storageType)) {
+      int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE);
+      localFileThreadPoolExecutor = createFlushEventExecutor(poolSize, "LocalFileFlushEventThreadPool");
+    }
+    if (StorageType.withHDFS(storageType)) {
+      int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HDFS_THREAD_POOL_SIZE);
+      hdfsThreadPoolExecutor = createFlushEventExecutor(poolSize, "HdfsFlushEventThreadPool");
+    }
+  }
+
+  protected Executor createFlushEventExecutor(int poolSize, String threadFactoryName) {
+    int waitQueueSize = shuffleServerConf.getInteger(
+        ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);

Review Comment:
   See here @zuston 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1154572080


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,75 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {

Review Comment:
   I'm ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types
URL: https://github.com/apache/incubator-uniffle/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1488427759

   It seems to be caused by concurrency issues, which cannot be reproduced in the development environment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153128514


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,79 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = storageManager.selectStorage(event);
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {
+            LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
+            return true;
           }
-        }
-
-        String user = StringUtils.defaultString(
-            shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            shuffleServerId,
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyOfSingleOnePartition);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-          ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
           event.increaseRetryTimes();
           ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          event.markPended();
+          addPendingEvents(event);

Review Comment:
   Make sense.



##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1202443460


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -160,102 +109,88 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
-          }
-        }
+      if (event.isPended()
+              && System.currentTimeMillis() - event.getStartPendingTime() > pendingEventTimeoutSec * 1000L) {
+        ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
+        LOG.error("Flush event cannot be flushed for {} sec, the event {} is dropped",
+            pendingEventTimeoutSec, event);
+        return true;
+      }
 
-        String user = StringUtils.defaultString(
-            shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        int maxConcurrencyPerPartitionToWrite = getMaxConcurrencyPerPartitionWrite(event);
-        CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            getShuffleServerId(),
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyPerPartitionToWrite);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-          ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager

Review Comment:
   Should we remove the `todo`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1259450264


##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -181,6 +181,7 @@ public Storage selectStorage(ShuffleDataFlushEvent event) {
               storage.getBasePath(), event);
         }
       } else {
+        event.setUnderStorage(storage);

Review Comment:
   #881 already add.
   ```
   if (event.getUnderStorage() == null) {
             event.setUnderStorage(storage);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1214170101


##########
server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java:
##########
@@ -107,7 +107,10 @@ public synchronized ShuffleDataFlushEvent toFlushEvent(
         spBlocks,
         isValid,
         this);
-    event.addCleanupCallback(() -> this.clearInFlushBuffer(event.getEventId()));
+    event.addCleanupCallback(() -> {
+      this.clearInFlushBuffer(event.getEventId());
+      spBlocks.forEach(spb -> spb.getData().release());

Review Comment:
   > spBlocks.forEach(spb -> spb.getData().release());
   
   This is for netty?



##########
server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java:
##########
@@ -181,6 +181,7 @@ public Storage selectStorage(ShuffleDataFlushEvent event) {
               storage.getBasePath(), event);
         }
       } else {
+        event.setUnderStorage(storage);

Review Comment:
   This is to check the storage is hdfs or localfile to put its corresponding thread pool ? Right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types
URL: https://github.com/apache/incubator-uniffle/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1154248152


##########
integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceBase.java:
##########
@@ -66,6 +66,7 @@ public void closeClient() {
 
   @Test
   public void fallbackTest() throws Exception {
+    REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";

Review Comment:
   Change to local variable maybe better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1489837091

   @jerqi  @smallzhongfeng  PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1488739363

   > It seems to be caused by concurrency issues, which cannot be reproduced in the development environment.
   
   Maybe  you can read the logs of ut. Do you know how to find the logs of ut?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1151413422


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -145,7 +160,14 @@ protected void eventLoop() {
   protected void processNextEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   +1. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1151317147


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -145,7 +160,14 @@ protected void eventLoop() {
   protected void processNextEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   It looks really strange for selecting storage here, which will select again before doing flush.
   
   And the selected storage will be changed if writing failed. I think we'd better refactor flushing part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1202451284


##########
server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class DefaultFlushEventHandler implements FlushEventHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultFlushEventHandler.class);
+
+  private final ShuffleServerConf shuffleServerConf;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+  protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue();
+  private Consumer<ShuffleDataFlushEvent> eventConsumer;
+
+  public DefaultFlushEventHandler(ShuffleServerConf conf, StorageManager storageManager,
+      Consumer<ShuffleDataFlushEvent> eventConsumer) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.storageManager = storageManager;
+    this.eventConsumer = eventConsumer;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    if (!flushQueue.offer(event)) {
+      LOG.warn("Flush queue is full, discard event: " + event);
+    } else {
+      ShuffleServerMetrics.gaugeEventQueueSize.inc();
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      eventConsumer.accept(event);
+    } finally {
+      if (isLocalFile) {
+        ShuffleServerMetrics.counterLocalFileEventFlush.inc();
+      } else {
+        ShuffleServerMetrics.counterHdfsEventFlush.inc();
+      }
+    }
+  }
+
+  protected void initFlushEventExecutor() {
+    if (StorageType.withLocalfile(storageType)) {
+      int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE);
+      localFileThreadPoolExecutor = createFlushEventExecutor(poolSize, "LocalFileFlushEventThreadPool");
+    }
+    if (StorageType.withHDFS(storageType)) {
+      int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HDFS_THREAD_POOL_SIZE);
+      hdfsThreadPoolExecutor = createFlushEventExecutor(poolSize, "HdfsFlushEventThreadPool");
+    }
+    startEventProcessor();
+  }
+
+  private void startEventProcessor() {
+    // the thread for flush data
+    Thread processEventThread = new Thread(this::eventLoop);
+    processEventThread.setName("ProcessEventThread");
+    processEventThread.setDaemon(true);
+    processEventThread.start();
+  }
+
+  protected void eventLoop() {
+    while (true) {

Review Comment:
   Could we have a variable to control whether stop the eventloop? Could we add the method `stop`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm closed pull request #775: [#757] feat(server): separate flush thread pools for different storage types
URL: https://github.com/apache/incubator-uniffle/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1151330034


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -118,14 +123,24 @@ private void startEventProcessor() {
     processEventThread.start();
   }
 
-  protected Executor createFlushEventExecutor() {
+  protected void initFlushEventExecutor() {

Review Comment:
   Ok.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -145,7 +160,14 @@ protected void eventLoop() {
   protected void processNextEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   > It looks really strange for selecting storage here, which will select again before doing flush.
   > 
   > And the selected storage will be changed if writing failed. I think we'd better refactor flushing part.
   
   You're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1486879161

   ## [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/775?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#775](https://codecov.io/gh/apache/incubator-uniffle/pull/775?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9130cab) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/1b48c12026ad7be42368fb6eee2cd9a1dff2bbb5?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1b48c12) will **increase** coverage by `0.89%`.
   > The diff coverage is `100.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #775      +/-   ##
   ============================================
   + Coverage     61.16%   62.05%   +0.89%     
   + Complexity     1985     1847     -138     
   ============================================
     Files           245      231      -14     
     Lines         13422    12124    -1298     
     Branches       1125     1001     -124     
   ============================================
   - Hits           8209     7524     -685     
   + Misses         4751     4173     -578     
   + Partials        462      427      -35     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/775?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...org/apache/uniffle/server/ShuffleFlushManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/775?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlRmx1c2hNYW5hZ2VyLmphdmE=) | `84.80% <100.00%> (+0.34%)` | :arrow_up: |
   | [...a/org/apache/uniffle/server/ShuffleServerConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/775?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL3NlcnZlci9TaHVmZmxlU2VydmVyQ29uZi5qYXZh) | `99.46% <100.00%> (+<0.01%)` | :arrow_up: |
   
   ... and [17 files with indirect coverage changes](https://codecov.io/gh/apache/incubator-uniffle/pull/775/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491779008

   > We should add this change to uniffle-migration-guide.md.
   
   Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153124111


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")
       .intType()
       .defaultValue(10)
-      .withDescription("thread pool for flush data to file");
+      .withDescription("thread pool for flush data to local file");
+
+  public static final ConfigOption<Integer> SERVER_FLUSH_HDFS_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.hdfs.threadPool.size")
+      .intType()
+      .defaultValue(10)
+      .withDescription("thread pool for flush data to hdfs");
 
   public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions

Review Comment:
   This is useful to flush the waiting queue of the thread pool.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153957751


##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")
       .intType()
       .defaultValue(10)
-      .withDescription("thread pool for flush data to file");
+      .withDescription("thread pool for flush data to local file");
+
+  public static final ConfigOption<Integer> SERVER_FLUSH_HDFS_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.hdfs.threadPool.size")
+      .intType()
+      .defaultValue(10)
+      .withDescription("thread pool for flush data to hdfs");
 
   public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions

Review Comment:
   I do not understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1491390541

   @jerqi  PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1151345101


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -145,7 +160,14 @@ protected void eventLoop() {
   protected void processNextEvent() {
     try {
       ShuffleDataFlushEvent event = flushQueue.take();
-      threadPoolExecutor.execute(() -> processEvent(event));
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   I think that the storageType should not be switched in the flushToFile method. When the writing fails or the storage is corrupted, the event should be put into pendingEvents, and flushToFile should be performed again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "jerqi (via GitHub)" <gi...@apache.org>.
jerqi commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1150705750


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -118,14 +123,24 @@ private void startEventProcessor() {
     processEventThread.start();
   }
 
-  protected Executor createFlushEventExecutor() {
+  protected void initFlushEventExecutor() {

Review Comment:
   We shouldn't expose the concept of storageType in the FlushManager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1152868628


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+
+  public StorageTypeFlushEventHandler(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager,
+      StorageManager storageManager) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.shuffleFlushManager = shuffleFlushManager;
+    this.storageManager = storageManager;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    Storage storage = storageManager.selectStorage(event);
+    if (storage instanceof HdfsStorage) {
+      hdfsThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, false));
+    } else if (storage instanceof LocalStorage) {
+      localFileThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, true));
+    } else {
+      throw new RssException("Unexpected storage type!");
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      shuffleFlushManager.processEvent(event);
+    } finally {

Review Comment:
   Sorry. I ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153191252


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;

Review Comment:
   > Can we design a unified mechanism to avoid hard code ? That means we don't need to change this part when introducing a new storage type like object store.
   
   Do you have a better suggestion? We need to determine which thread pool to allocate according to the type of Storage. `MultiStorageManager` also uses hard coding.
   ```
   private final StorageManager warmStorageManager;
   private final StorageManager coldStorageManager;
   
   warmStorageManager = new LocalStorageManager(conf);
   coldStorageManager = new HdfsStorageManager(conf);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] smallzhongfeng commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "smallzhongfeng (via GitHub)" <gi...@apache.org>.
smallzhongfeng commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1152858842


##########
server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java:
##########
@@ -587,6 +587,7 @@ public void shuffleFlushThreshold() throws Exception {
     shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31));
     assertEquals(96 + 63, shuffleBufferManager.getUsedMemory());
     shuffleFlushManager.flush();
+    Thread.sleep(100);

Review Comment:
   Why do we need to sleep here?
   
   



##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;
+  private Executor hdfsThreadPoolExecutor;
+  private final StorageType storageType;
+
+  public StorageTypeFlushEventHandler(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager,
+      StorageManager storageManager) {
+    this.shuffleServerConf = conf;
+    this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
+    this.shuffleFlushManager = shuffleFlushManager;
+    this.storageManager = storageManager;
+    initFlushEventExecutor();
+  }
+
+  @Override
+  public void handle(ShuffleDataFlushEvent event) {
+    Storage storage = storageManager.selectStorage(event);
+    if (storage instanceof HdfsStorage) {
+      hdfsThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, false));
+    } else if (storage instanceof LocalStorage) {
+      localFileThreadPoolExecutor.execute(() -> handleEventAndUpdateMetrics(event, true));
+    } else {
+      throw new RssException("Unexpected storage type!");
+    }
+  }
+
+  private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, boolean isLocalFile) {
+    try {
+      shuffleFlushManager.processEvent(event);
+    } finally {

Review Comment:
   Should we catch some exceptions and add logs here?



##########
server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java:
##########
@@ -580,4 +580,58 @@ public void processPendingEventsTest(@TempDir File tempDir) throws Exception {
     assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
     assertEquals(0, manager.getPendingEventsSize());
   }
+
+  @Test
+  public void storageTypeFlushEventHandlerTest(@TempDir File tempDir) throws Exception {
+    shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.toString());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+    shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
+    shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_HDFS_THREAD_POOL_SIZE, 1);

Review Comment:
   Maybe we can change it to default value now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] Separate flush thread pools for different storage type.

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1153092102


##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {

Review Comment:
   Rename to `DefaultFlushEventHandler` ?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,79 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = storageManager.selectStorage(event);

Review Comment:
   I'm still not satisfied with this `selectStorage` again.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")

Review Comment:
   `rss.server.flush.local-file.threadPool.size` -> `rss.server.flush.localfile.threadPool.size` ? 



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,79 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = storageManager.selectStorage(event);
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {
+            LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
+            return true;
           }
-        }
-
-        String user = StringUtils.defaultString(
-            shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            shuffleServerId,
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyOfSingleOnePartition);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-          ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
           event.increaseRetryTimes();
           ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          event.markPended();
+          addPendingEvents(event);

Review Comment:
   How about re-pushing to `flushEventHandler` queue? I think the `pending` queue could be removed, which is strange after this PR refactoring.



##########
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Queues;
+
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.storage.common.HdfsStorage;
+import org.apache.uniffle.storage.common.LocalStorage;
+import org.apache.uniffle.storage.common.Storage;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class StorageTypeFlushEventHandler implements FlushEventHandler {
+  private final ShuffleServerConf shuffleServerConf;
+  private final ShuffleFlushManager shuffleFlushManager;
+  private final StorageManager storageManager;
+  private Executor localFileThreadPoolExecutor;

Review Comment:
   Can we design a unified mechanism to avoid hard code ? That means we don't need to change this part when introducing a new storage type like object store.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java:
##########
@@ -82,11 +82,17 @@ public class ShuffleServerConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("rss heartbeat interval ms");
 
-  public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
-      .key("rss.server.flush.threadPool.size")
+  public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.local-file.threadPool.size")
       .intType()
       .defaultValue(10)
-      .withDescription("thread pool for flush data to file");
+      .withDescription("thread pool for flush data to local file");
+
+  public static final ConfigOption<Integer> SERVER_FLUSH_HDFS_THREAD_POOL_SIZE = ConfigOptions
+      .key("rss.server.flush.hdfs.threadPool.size")
+      .intType()
+      .defaultValue(10)
+      .withDescription("thread pool for flush data to hdfs");
 
   public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions

Review Comment:
   Why not removing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1630455762

   I renamed `hdfs thread pool` to `hadoop thread pool`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1630455023

   All done. @jerqi  @zuston 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston merged pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston merged PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] leixm commented on pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "leixm (via GitHub)" <gi...@apache.org>.
leixm commented on PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#issuecomment-1610752699

   I will fix later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #775: [#757] feat(server): separate flush thread pools for different storage types

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on code in PR #775:
URL: https://github.com/apache/incubator-uniffle/pull/775#discussion_r1154389602


##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,75 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();

Review Comment:
   When to set this `storage` of event ? 



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,75 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {

Review Comment:
   How about removing this pending queue? WDYT @jerqi 



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,75 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {
+            LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
+            return true;
           }
-        }
-
-        String user = StringUtils.defaultString(
-            shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            shuffleServerId,
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyOfSingleOnePartition);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-          ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
           event.increaseRetryTimes();
           ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          event.markPended();
+          addPendingEvents(event);
         }
-      } catch (Throwable throwable) {
-        // just log the error, don't throw the exception and stop the flush thread
-        LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
-        event.increaseRetryTimes();
-      }
-    }
-
-    if (event.getRetryTimes() > retryMax) {
-      LOG.error("Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax);
-      if (event.getUnderStorage() != null) {
-        ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+        return false;
       }
-    }
 
-    event.doCleanup();
-    if (shuffleServer != null) {
-      long duration = System.currentTimeMillis() - start;
+      String user = StringUtils.defaultString(
+          shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+          StringUtils.EMPTY
+      );
+      CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
+          storageType,
+          event.getAppId(),
+          event.getShuffleId(),
+          event.getStartPartition(),
+          event.getEndPartition(),
+          storageBasePaths.toArray(new String[storageBasePaths.size()]),
+          shuffleServerId,
+          hadoopConf,
+          storageDataReplica,
+          user,
+          maxConcurrencyOfSingleOnePartition);
+      ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+      writeSuccess = storageManager.write(storage, handler, event);
       if (writeSuccess) {
-        LOG.debug("Flush to file success in {} ms and release {} bytes", duration, event.getSize());
-      } else {
-        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
-        LOG.error("Flush to file for {} failed in {} ms and release {} bytes", event, duration, event.getSize());
+        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
+        ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+      } else if (event.getRetryTimes() <= retryMax) {
+        event.increaseRetryTimes();
+        ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+        event.getUnderEventHandler().handle(event);

Review Comment:
   And this re-pushing by itself looks strange.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java:
##########
@@ -163,101 +168,75 @@ private void processEvent(ShuffleDataFlushEvent event) {
     }
   }
 
-  private void flushToFile(ShuffleDataFlushEvent event) {
-    long start = System.currentTimeMillis();
+  private boolean flushToFile(ShuffleDataFlushEvent event) {
     boolean writeSuccess = false;
 
-    while (event.getRetryTimes() <= retryMax) {
-      try {
-        if (!event.isValid()) {
-          writeSuccess = true;
-          LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
-          break;
-        }
+    try {
+      if (!event.isValid()) {
+        LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
+        return true;
+      }
 
-        List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
-        if (blocks == null || blocks.isEmpty()) {
-          LOG.info("There is no block to be flushed: {}", event);
-          break;
-        }
+      List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
+      if (blocks == null || blocks.isEmpty()) {
+        LOG.info("There is no block to be flushed: {}", event);
+        return true;
+      }
 
-        Storage storage = storageManager.selectStorage(event);
-        if (storage == null) {
-          LOG.error("Storage selected is null and this should not happen. event: {}", event);
-          break;
-        }
+      Storage storage = event.getUnderStorage();
+      if (storage == null) {
+        LOG.error("Storage selected is null and this should not happen. event: {}", event);
+        return true;
+      }
 
-        if (!storage.canWrite()) {
-          // todo: Could we add an interface supportPending for storageManager
-          //       to unify following logic of multiple different storage managers
-          if (storageManager instanceof MultiStorageManager) {
-            event.increaseRetryTimes();
-            ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
-            continue;
-          } else {
-            // To avoid being re-pushed to pending queue and make the server too much pressure,
-            // it's better to drop directly.
-            if (event.isPended()) {
-              LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
-              break;
-            }
-            event.increaseRetryTimes();
-            event.markPended();
-            addPendingEvents(event);
-            return;
+      if (!storage.canWrite()) {
+        // todo: Could we add an interface supportPending for storageManager
+        //       to unify following logic of multiple different storage managers
+        if (event.getRetryTimes() <= retryMax) {
+          if (event.isPended()) {
+            LOG.error("Drop this event directly due to already having entered pending queue. event: {}", event);
+            return true;
           }
-        }
-
-        String user = StringUtils.defaultString(
-            shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
-            StringUtils.EMPTY
-        );
-        CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
-            storageType,
-            event.getAppId(),
-            event.getShuffleId(),
-            event.getStartPartition(),
-            event.getEndPartition(),
-            storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            shuffleServerId,
-            hadoopConf,
-            storageDataReplica,
-            user,
-            maxConcurrencyOfSingleOnePartition);
-        ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
-        writeSuccess = storageManager.write(storage, handler, event);
-        if (writeSuccess) {
-          updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
-          ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
-          break;
-        } else {
           event.increaseRetryTimes();
           ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+          event.markPended();
+          addPendingEvents(event);
         }
-      } catch (Throwable throwable) {
-        // just log the error, don't throw the exception and stop the flush thread
-        LOG.error("Exception happened when process flush shuffle data for {}", event, throwable);
-        event.increaseRetryTimes();
-      }
-    }
-
-    if (event.getRetryTimes() > retryMax) {
-      LOG.error("Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax);
-      if (event.getUnderStorage() != null) {
-        ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost());
+        return false;
       }
-    }
 
-    event.doCleanup();
-    if (shuffleServer != null) {
-      long duration = System.currentTimeMillis() - start;
+      String user = StringUtils.defaultString(
+          shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+          StringUtils.EMPTY
+      );
+      CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest(
+          storageType,
+          event.getAppId(),
+          event.getShuffleId(),
+          event.getStartPartition(),
+          event.getEndPartition(),
+          storageBasePaths.toArray(new String[storageBasePaths.size()]),
+          shuffleServerId,
+          hadoopConf,
+          storageDataReplica,
+          user,
+          maxConcurrencyOfSingleOnePartition);
+      ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request);
+      writeSuccess = storageManager.write(storage, handler, event);
       if (writeSuccess) {
-        LOG.debug("Flush to file success in {} ms and release {} bytes", duration, event.getSize());
-      } else {
-        ShuffleServerMetrics.counterTotalFailedWrittenEventNum.inc();
-        LOG.error("Flush to file for {} failed in {} ms and release {} bytes", event, duration, event.getSize());
+        updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), blocks);
+        ShuffleServerMetrics.incStorageSuccessCounter(storage.getStorageHost());
+      } else if (event.getRetryTimes() <= retryMax) {
+        event.increaseRetryTimes();
+        ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost());
+        event.getUnderEventHandler().handle(event);

Review Comment:
   I think this is wrong. That means once the storage is selected for one event, it will always try this, which will not give chance to change storage for event



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org