You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2016/11/12 02:35:33 UTC

bookkeeper git commit: BOOKKEEPER-961: Assign read/write requests for same ledger to a single thread

Repository: bookkeeper
Updated Branches:
  refs/heads/master 026ef10e1 -> 7673febb6


BOOKKEEPER-961: Assign read/write requests for same ledger to a single thread

When entries for the same ledger are processed by the bookie we should avoid
the reordering of the request. Currently, if multiple read/write threads are
configured, the requests will be passed to the executor and writes for same
ledger will be spread across multiple threads.

This poses 2 issues:
 1. Mutex contention to access the LedgerDescriptor
 2. If the client receives add-entry acks out of order it has anyway to wait
    for the acks of previous entries before acknowledging the whole sequence
    to the application. In practice, the reordering is increasing the latency
   experienced by the application.

Author: Matteo Merli <mm...@yahoo-inc.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #69 from merlimat/bk-fixed-ledger-thread


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/7673febb
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/7673febb
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/7673febb

Branch: refs/heads/master
Commit: 7673febb6519fedb96320ec98db71601826a63b1
Parents: 026ef10
Author: Matteo Merli <mm...@apache.org>
Authored: Fri Nov 11 18:35:13 2016 -0800
Committer: Matteo Merli <mm...@apache.org>
Committed: Fri Nov 11 18:35:13 2016 -0800

----------------------------------------------------------------------
 .../proto/BookieRequestProcessor.java           | 33 +++++++-------------
 .../bookkeeper/proto/PacketProcessorBase.java   |  6 ++--
 .../bookkeeper/proto/PacketProcessorBaseV3.java |  3 +-
 .../bookkeeper/proto/ReadEntryProcessorV3.java  |  4 +--
 .../bookkeeper/proto/WriteEntryProcessorV3.java |  4 +--
 5 files changed, 21 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7673febb/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 1608328..4dec39a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,15 +20,12 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,12 +52,12 @@ public class BookieRequestProcessor implements RequestProcessor {
     /**
      * The threadpool used to execute all read entry requests issued to this server.
      */
-    private final ExecutorService readThreadPool;
+    private final OrderedSafeExecutor readThreadPool;
 
     /**
      * The threadpool used to execute all add entry requests issued to this server.
      */
-    private final ExecutorService writeThreadPool;
+    private final OrderedSafeExecutor writeThreadPool;
 
     // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
@@ -74,12 +71,8 @@ public class BookieRequestProcessor implements RequestProcessor {
                                   StatsLogger statsLogger) {
         this.serverCfg = serverCfg;
         this.bookie = bookie;
-        this.readThreadPool =
-            createExecutor(this.serverCfg.getNumReadWorkerThreads(),
-                           "BookieReadThread-" + serverCfg.getBookiePort() + "-%d");
-        this.writeThreadPool =
-            createExecutor(this.serverCfg.getNumAddWorkerThreads(),
-                           "BookieWriteThread-" + serverCfg.getBookiePort() + "-%d");
+        this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverCfg.getBookiePort());
+        this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverCfg.getBookiePort());
         // Expose Stats
         this.statsEnabled = serverCfg.isStatisticsEnabled();
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -94,16 +87,15 @@ public class BookieRequestProcessor implements RequestProcessor {
         shutdownExecutor(readThreadPool);
     }
 
-    private ExecutorService createExecutor(int numThreads, String nameFormat) {
+    private OrderedSafeExecutor createExecutor(int numThreads, String nameFormat) {
         if (numThreads <= 0) {
             return null;
         } else {
-            return Executors.newFixedThreadPool(numThreads,
-                new ThreadFactoryBuilder().setNameFormat(nameFormat).build());
+            return OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).build();
         }
     }
 
-    private void shutdownExecutor(ExecutorService service) {
+    private void shutdownExecutor(OrderedSafeExecutor service) {
         if (null != service) {
             service.shutdown();
         }
@@ -160,7 +152,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submit(write);
+            writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), write);
         }
     }
 
@@ -169,7 +161,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == readThreadPool) {
             read.run();
         } else {
-            readThreadPool.submit(read);
+            readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
         }
     }
 
@@ -178,7 +170,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == writeThreadPool) {
             write.run();
         } else {
-            writeThreadPool.submit(write);
+            writeThreadPool.submitOrdered(r.getLedgerId(), write);
         }
     }
 
@@ -187,8 +179,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         if (null == readThreadPool) {
             read.run();
         } else {
-            readThreadPool.submit(read);
+            readThreadPool.submitOrdered(r.getLedgerId(), read);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7673febb/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index a2dc4d8..681f6c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -19,15 +19,15 @@ package org.apache.bookkeeper.proto;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract class PacketProcessorBase implements Runnable {
+abstract class PacketProcessorBase extends SafeRunnable {
     private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
     final Request request;
     final Channel channel;
@@ -64,7 +64,7 @@ abstract class PacketProcessorBase implements Runnable {
     }
 
     @Override
-    public void run() {
+    public void safeRun() {
         if (!isVersionCompatible()) {
             sendResponse(BookieProtocol.EBADVERSION,
                          ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request),

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7673febb/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 9ffca53..85ec6cb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -28,9 +28,10 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.jboss.netty.channel.Channel;
 
-public abstract class PacketProcessorBaseV3 {
+public abstract class PacketProcessorBaseV3 extends SafeRunnable {
 
     final Request request;
     final Channel channel;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7673febb/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index 4073d41..b9037c1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
 
-class ReadEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
 
     private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
 
@@ -148,7 +148,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     }
 
     @Override
-    public void run() {
+    public void safeRun() {
         ReadResponse readResponse = getReadResponse();
         sendResponse(readResponse);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/7673febb/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index 6517d8f..242ed81 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -36,7 +36,7 @@ import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class WriteEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
     private final static Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class);
 
     public WriteEntryProcessorV3(Request request, Channel channel,
@@ -138,7 +138,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
     }
 
     @Override
-    public void run() {
+    public void safeRun() {
         AddResponse addResponse = getAddResponse();
         if (null != addResponse) {
             // This means there was an error and we should send this back.