You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/04 22:10:14 UTC

[GitHub] sijie closed pull request #720: Fixed simultaneus reads on same ledger/entry with v2 protocol

sijie closed pull request #720: Fixed simultaneus reads on same ledger/entry with v2 protocol
URL: https://github.com/apache/bookkeeper/pull/720
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index d470fc004..3aeddb24b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,46 +20,6 @@
 
 import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.DefaultEventLoopGroup;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.local.LocalChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.CorruptedFrameException;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.Certificate;
@@ -121,6 +81,48 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
 /**
  * This class manages all details of connection to a particular bookie. It also
  * has reconnect logic if a connection to a bookie fails.
@@ -155,6 +157,11 @@
     private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects =
         new ConcurrentHashMap<CompletionKey, CompletionValue>();
 
+    // Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
+    // read request for the same ledgerId/entryId
+    private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
+        LinkedListMultimap.create();
+
     private final StatsLogger statsLogger;
     private final OpStatsLogger readEntryOpLogger;
     private final OpStatsLogger readTimeoutOpLogger;
@@ -604,14 +611,13 @@ public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
                     .build();
         }
 
-        CompletionValue completion = new ReadCompletion(completionKey,
-                                                        cb, ctx,
-                                                        ledgerId, entryId);
-        if (completionObjects.putIfAbsent(
-                    completionKey, completion) != null) {
-            // We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
-            completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
-            return;
+        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
+        CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
+        if (existingValue != null) {
+            // There's a pending read request on same ledger/entry. Use the multimap to track all of them
+            synchronized (completionObjectsV2Conflicts) {
+                completionObjectsV2Conflicts.put(completionKey, readCompletion);
+            }
         }
 
         writeAndFlush(channel, completionKey, request);
@@ -726,15 +732,13 @@ private void readEntryInternal(final long ledgerId,
                     .build();
         }
 
-        CompletionValue completion = new ReadCompletion(completionKey, cb,
-                                                        ctx, ledgerId, entryId);
-        CompletionValue existingValue = completionObjects.putIfAbsent(
-                completionKey, completion);
+        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
+        CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
         if (existingValue != null) {
-            // There's a pending read request on same ledger/entry. This is not supported in V2 protocol
-            LOG.warn("Failing concurrent request to read at ledger: {} entry: {}", ledgerId, entryId);
-            completion.errorOut(BKException.Code.UnexpectedConditionException);
-            return;
+            // There's a pending read request on same ledger/entry. Use the multimap to track all of them
+            synchronized (completionObjectsV2Conflicts) {
+                completionObjectsV2Conflicts.put(completionKey, readCompletion);
+            }
         }
 
         writeAndFlush(channel, completionKey, request);
@@ -868,6 +872,16 @@ void errorOut(final CompletionKey key, final int rc) {
         CompletionValue completion = completionObjects.remove(key);
         if (completion != null) {
             completion.errorOut(rc);
+        } else {
+            // If there's no completion object here, try in the multimap
+            synchronized (completionObjectsV2Conflicts) {
+                if (completionObjectsV2Conflicts.containsKey(key)) {
+                    completion = completionObjectsV2Conflicts.get(key).get(0);
+                    completionObjectsV2Conflicts.remove(key, completion);
+
+                    completion.errorOut(rc);
+                }
+            }
         }
     }
 
@@ -879,13 +893,17 @@ void errorOut(final CompletionKey key, final int rc) {
      */
 
     void errorOutOutstandingEntries(int rc) {
-
         // DO NOT rewrite these using Map.Entry iterations. We want to iterate
         // on keys and see if we are successfully able to remove the key from
         // the map. Because the add and the read methods also do the same thing
         // in case they get a write failure on the socket. The one who
         // successfully removes the key from the map is the one responsible for
         // calling the application callback.
+        for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
+            while (completionObjectsV2Conflicts.get(key).size() > 0) {
+                errorOut(key, rc);
+            }
+        }
         for (CompletionKey key : completionObjects.keySet()) {
             errorOut(key, rc);
         }
@@ -990,8 +1008,17 @@ private void readV2Response(final BookieProtocol.Response response) {
         final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);
 
         final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
-        final CompletionValue completionValue = completionObjects.remove(key);
+        CompletionValue completionValue = completionObjects.remove(key);
         key.release();
+        if (completionValue == null) {
+            // If there's no completion object here, try in the multimap
+            synchronized (this) {
+                if (completionObjectsV2Conflicts.containsKey(key)) {
+                    completionValue = completionObjectsV2Conflicts.get(key).get(0);
+                    completionObjectsV2Conflicts.remove(key, completionValue);
+                }
+            }
+        }
 
         if (null == completionValue) {
             // Unexpected response, so log it. The txnId should have been present.
@@ -1001,14 +1028,11 @@ private void readV2Response(final BookieProtocol.Response response) {
             }
         } else {
             long orderingKey = completionValue.ledgerId;
+            final CompletionValue finalCompletionValue = completionValue;
 
-            executor.submitOrdered(orderingKey, new SafeRunnable() {
-                    @Override
-                    public void safeRun() {
-                        completionValue.handleV2Response(ledgerId, entryId,
-                                                         status, response);
-                        response.recycle();
-                    }
+            executor.submitOrdered(orderingKey, () -> {
+                    finalCompletionValue.handleV2Response(ledgerId, entryId, status, response);
+                    response.recycle();
                 });
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 3527575a0..547b36730 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -400,11 +400,11 @@ public void testReadHandleWithExplicitLAC() throws Exception {
         Assert.assertTrue(
                 "Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
                 (wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made   
+        // readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
         Assert.assertTrue(
                 "Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
                 (rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-        
+
         long explicitlac = rlh.readExplicitLastConfirmed();
         Assert.assertTrue(
                 "Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac,
@@ -413,7 +413,7 @@ public void testReadHandleWithExplicitLAC() throws Exception {
         Assert.assertTrue(
                 "Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
                 (rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-        
+
         Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
         int entryId = numOfEntries;
         while (entries.hasMoreElements()) {
@@ -818,4 +818,68 @@ public void testReadEntryReleaseByteBufs() throws Exception {
             }
         }
     }
+
+    /**
+     * Tests that issuing multiple reads for the same entry at the same time works as expected.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDoubleRead() throws Exception {
+        LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+        lh.addEntry("test".getBytes());
+
+        // Read the same entry more times asynchronously
+        final int N = 10;
+        final CountDownLatch latch = new CountDownLatch(N);
+        for (int i = 0; i < N; i++) {
+            lh.asyncReadEntries(0, 0, new ReadCallback() {
+                public void readComplete(int rc, LedgerHandle lh,
+                                         Enumeration<LedgerEntry> seq, Object ctx) {
+                    if (rc == BKException.Code.OK) {
+                        latch.countDown();
+                    } else {
+                        fail("Read fail");
+                    }
+                }
+            }, null);
+        }
+
+        latch.await();
+    }
+
+    /**
+     * Tests that issuing multiple reads for the same entry at the same time works as expected.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDoubleReadWithV2Protocol() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+        conf.setUseV2WireProtocol(true);
+        BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
+        LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+        lh.addEntry("test".getBytes());
+
+        // Read the same entry more times asynchronously
+        final int N = 10;
+        final CountDownLatch latch = new CountDownLatch(N);
+        for (int i = 0; i < N; i++) {
+            lh.asyncReadEntries(0, 0, new ReadCallback() {
+                public void readComplete(int rc, LedgerHandle lh,
+                                         Enumeration<LedgerEntry> seq, Object ctx) {
+                    if (rc == BKException.Code.OK) {
+                        latch.countDown();
+                    } else {
+                        fail("Read fail");
+                    }
+                }
+            }, null);
+        }
+
+        latch.await();
+        bkc.close();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services