You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2021/07/07 01:53:40 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17749. Remove lock contention in SelectorPool of SocketIOWithTimeout (#3080)

This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 24b7808  HADOOP-17749. Remove lock contention in SelectorPool of SocketIOWithTimeout (#3080)
24b7808 is described below

commit 24b780820c244a5d4f204e8ce309ad3d8f2056da
Author: liangxs <li...@gmail.com>
AuthorDate: Tue Jul 6 09:11:03 2021 +0800

    HADOOP-17749. Remove lock contention in SelectorPool of SocketIOWithTimeout (#3080)
    
    (cherry picked from commit a5db6831bc674a24a3251cf1b20f22a4fd4fac9f)
---
 .../org/apache/hadoop/net/SocketIOWithTimeout.java | 103 +++++++++------------
 .../apache/hadoop/net/TestSocketIOWithTimeout.java |  79 ++++++++++++++++
 2 files changed, 124 insertions(+), 58 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
index 312a481..d117bb8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java
@@ -28,8 +28,9 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -48,8 +49,6 @@ abstract class SocketIOWithTimeout {
   private long timeout;
   private boolean closed = false;
   
-  private static SelectorPool selector = new SelectorPool();
-  
   /* A timeout value of 0 implies wait for ever. 
    * We should have a value of timeout that implies zero wait.. i.e. 
    * read or write returns immediately.
@@ -154,7 +153,7 @@ abstract class SocketIOWithTimeout {
       //now wait for socket to be ready.
       int count = 0;
       try {
-        count = selector.select(channel, ops, timeout);  
+        count = SelectorPool.select(channel, ops, timeout);
       } catch (IOException e) { //unexpected IOException.
         closed = true;
         throw e;
@@ -200,7 +199,7 @@ abstract class SocketIOWithTimeout {
         // we might have to call finishConnect() more than once
         // for some channels (with user level protocols)
         
-        int ret = selector.select((SelectableChannel)channel, 
+        int ret = SelectorPool.select(channel,
                                   SelectionKey.OP_CONNECT, timeoutLeft);
         
         if (ret > 0 && channel.finishConnect()) {
@@ -242,7 +241,7 @@ abstract class SocketIOWithTimeout {
    */
   void waitForIO(int ops) throws IOException {
     
-    if (selector.select(channel, ops, timeout) == 0) {
+    if (SelectorPool.select(channel, ops, timeout) == 0) {
       throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
                                                               ops)); 
     }
@@ -280,12 +279,17 @@ abstract class SocketIOWithTimeout {
    * This maintains a pool of selectors. These selectors are closed
    * once they are idle (unused) for a few seconds.
    */
-  private static class SelectorPool {
+  private static final class SelectorPool {
     
-    private static class SelectorInfo {
-      Selector              selector;
-      long                  lastActivityTime;
-      LinkedList<SelectorInfo> queue; 
+    private static final class SelectorInfo {
+      private final SelectorProvider provider;
+      private final Selector selector;
+      private long lastActivityTime;
+
+      private SelectorInfo(SelectorProvider provider, Selector selector) {
+        this.provider = provider;
+        this.selector = selector;
+      }
       
       void close() {
         if (selector != null) {
@@ -298,16 +302,11 @@ abstract class SocketIOWithTimeout {
       }    
     }
     
-    private static class ProviderInfo {
-      SelectorProvider provider;
-      LinkedList<SelectorInfo> queue; // lifo
-      ProviderInfo next;
-    }
+    private static ConcurrentHashMap<SelectorProvider, ConcurrentLinkedDeque
+        <SelectorInfo>> providerMap = new ConcurrentHashMap<>();
     
     private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
     
-    private ProviderInfo providerList = null;
-    
     /**
      * Waits on the channel with the given timeout using one of the 
      * cached selectors. It also removes any cached selectors that are
@@ -319,7 +318,7 @@ abstract class SocketIOWithTimeout {
      * @return
      * @throws IOException
      */
-    int select(SelectableChannel channel, int ops, long timeout) 
+    static int select(SelectableChannel channel, int ops, long timeout)
                                                    throws IOException {
      
       SelectorInfo info = get(channel);
@@ -385,35 +384,18 @@ abstract class SocketIOWithTimeout {
      * @return 
      * @throws IOException
      */
-    private synchronized SelectorInfo get(SelectableChannel channel) 
+    private static SelectorInfo get(SelectableChannel channel)
                                                          throws IOException {
-      SelectorInfo selInfo = null;
-      
       SelectorProvider provider = channel.provider();
-      
       // pick the list : rarely there is more than one provider in use.
-      ProviderInfo pList = providerList;
-      while (pList != null && pList.provider != provider) {
-        pList = pList.next;
-      }      
-      if (pList == null) {
-        //LOG.info("Creating new ProviderInfo : " + provider.toString());
-        pList = new ProviderInfo();
-        pList.provider = provider;
-        pList.queue = new LinkedList<SelectorInfo>();
-        pList.next = providerList;
-        providerList = pList;
-      }
-      
-      LinkedList<SelectorInfo> queue = pList.queue;
-      
-      if (queue.isEmpty()) {
+      ConcurrentLinkedDeque<SelectorInfo> infoQ = providerMap.computeIfAbsent(
+          provider, k -> new ConcurrentLinkedDeque<>());
+
+      SelectorInfo selInfo = infoQ.pollLast(); // last in first out
+      if (selInfo == null) {
         Selector selector = provider.openSelector();
-        selInfo = new SelectorInfo();
-        selInfo.selector = selector;
-        selInfo.queue = queue;
-      } else {
-        selInfo = queue.removeLast();
+        // selInfo will be put into infoQ after `#release()`
+        selInfo = new SelectorInfo(provider, selector);
       }
       
       trimIdleSelectors(Time.now());
@@ -426,34 +408,39 @@ abstract class SocketIOWithTimeout {
      * 
      * @param info
      */
-    private synchronized void release(SelectorInfo info) {
+    private static void release(SelectorInfo info) {
       long now = Time.now();
       trimIdleSelectors(now);
       info.lastActivityTime = now;
-      info.queue.addLast(info);
+      // SelectorInfos in queue are sorted by lastActivityTime
+      providerMap.get(info.provider).addLast(info);
     }
     
+    private static AtomicBoolean trimming = new AtomicBoolean(false);
+
     /**
      * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
      * traverse the whole list, just over the one that have crossed 
      * the timeout.
      */
-    private void trimIdleSelectors(long now) {
+    private static void trimIdleSelectors(long now) {
+      if (!trimming.compareAndSet(false, true)) {
+        return;
+      }
+
       long cutoff = now - IDLE_TIMEOUT;
-      
-      for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
-        if (pList.queue.isEmpty()) {
-          continue;
-        }
-        for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
-          SelectorInfo info = it.next();
-          if (info.lastActivityTime > cutoff) {
+      for (ConcurrentLinkedDeque<SelectorInfo> infoQ : providerMap.values()) {
+        SelectorInfo oldest;
+        while ((oldest = infoQ.peekFirst()) != null) {
+          if (oldest.lastActivityTime <= cutoff && infoQ.remove(oldest)) {
+            oldest.close();
+          } else {
             break;
           }
-          it.remove();
-          info.close();
         }
       }
+
+      trimming.set(false);
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
index 272eae7..c55f020 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java
@@ -24,6 +24,11 @@ import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.nio.channels.Pipe;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MultithreadedTestUtil;
@@ -187,6 +192,46 @@ public class TestSocketIOWithTimeout {
   }
 
   @Test
+  public void testSocketIOWithTimeoutByMultiThread() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    Runnable ioTask = () -> {
+      try {
+        Pipe pipe = Pipe.open();
+        try (Pipe.SourceChannel source = pipe.source();
+             InputStream in = new SocketInputStream(source, TIMEOUT);
+             Pipe.SinkChannel sink = pipe.sink();
+             OutputStream out = new SocketOutputStream(sink, TIMEOUT)) {
+
+          byte[] writeBytes = TEST_STRING.getBytes();
+          byte[] readBytes = new byte[writeBytes.length];
+          latch.await();
+
+          out.write(writeBytes);
+          doIO(null, out, TIMEOUT);
+
+          in.read(readBytes);
+          assertArrayEquals(writeBytes, readBytes);
+          doIO(in, null, TIMEOUT);
+        }
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    };
+
+    int threadCnt = 64;
+    ExecutorService threadPool = Executors.newFixedThreadPool(threadCnt);
+    for (int i = 0; i < threadCnt; ++i) {
+      threadPool.submit(ioTask);
+    }
+
+    Thread.sleep(1000);
+    latch.countDown();
+
+    threadPool.shutdown();
+    assertTrue(threadPool.awaitTermination(3, TimeUnit.SECONDS));
+  }
+
+  @Test
   public void testSocketIOWithTimeoutInterrupted() throws Exception {
     Pipe pipe = Pipe.open();
     final int timeout = TIMEOUT * 10;
@@ -223,4 +268,38 @@ public class TestSocketIOWithTimeout {
       ctx.stop();
     }
   }
+
+  @Test
+  public void testSocketIOWithTimeoutInterruptedByMultiThread()
+      throws Exception {
+    final int timeout = TIMEOUT * 10;
+    AtomicLong readCount = new AtomicLong();
+    AtomicLong exceptionCount = new AtomicLong();
+    Runnable ioTask = () -> {
+      try {
+        Pipe pipe = Pipe.open();
+        try (Pipe.SourceChannel source = pipe.source();
+             InputStream in = new SocketInputStream(source, timeout)) {
+          in.read();
+          readCount.incrementAndGet();
+        } catch (InterruptedIOException ste) {
+          exceptionCount.incrementAndGet();
+        }
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    };
+
+    int threadCnt = 64;
+    ExecutorService threadPool = Executors.newFixedThreadPool(threadCnt);
+    for (int i = 0; i < threadCnt; ++i) {
+      threadPool.submit(ioTask);
+    }
+    Thread.sleep(1000);
+    threadPool.shutdownNow();
+    threadPool.awaitTermination(1, TimeUnit.SECONDS);
+
+    assertEquals(0, readCount.get());
+    assertEquals(threadCnt, exceptionCount.get());
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org