You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/14 06:16:41 UTC

[GitHub] eolivelli closed pull request #1393: BP-14 forceLedger wire protocol server side implementation

eolivelli closed pull request #1393: BP-14 forceLedger wire protocol server side implementation
URL: https://github.com/apache/bookkeeper/pull/1393
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index d4a7d2ea8..bac941133 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -63,6 +63,7 @@ enum OperationType {
     READ_LAC = 7;
     GET_BOOKIE_INFO = 8;
     START_TLS = 9;
+    FORCE_LEDGER = 10;
 }
 
 /**
@@ -85,6 +86,7 @@ message Request {
     optional ReadLacRequest readLacRequest = 104;
     optional GetBookieInfoRequest getBookieInfoRequest = 105;
     optional StartTLSRequest startTLSRequest = 106;
+    optional ForceLedgerRequest forceLedgerRequest = 107;
 }
 
 message ReadRequest {
@@ -126,6 +128,10 @@ message WriteLacRequest {
     required bytes body = 4;
 }
 
+message ForceLedgerRequest {
+    required int64 ledgerId = 1;
+}
+
 message ReadLacRequest {
     required int64 ledgerId = 1;
 }
@@ -153,6 +159,7 @@ message Response {
     optional ReadLacResponse readLacResponse = 104;
     optional GetBookieInfoResponse getBookieInfoResponse = 105;
     optional StartTLSResponse startTLSResponse = 106;
+    optional ForceLedgerResponse forceLedgerResponse = 107;
 }
 
 message ReadResponse {
@@ -181,6 +188,11 @@ message WriteLacResponse {
     required int64 ledgerId = 2;
 }
 
+message ForceLedgerResponse {
+    required StatusCode status = 1;
+    required int64 ledgerId = 2;
+}
+
 message ReadLacResponse {
     required StatusCode status = 1;
     required int64 ledgerId = 2;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index d2ce94bbc..d488bc96e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -44,6 +44,8 @@
     // Stats
     String ADD_ENTRY_REQUEST = "ADD_ENTRY_REQUEST";
     String ADD_ENTRY = "ADD_ENTRY";
+    String FORCE_LEDGER_REQUEST = "FORCE_LEDGER_REQUEST";
+    String FORCE_LEDGER = "FORCE_LEDGER";
     String READ_ENTRY_REQUEST = "READ_ENTRY_REQUEST";
     String READ_ENTRY = "READ_ENTRY";
     String READ_ENTRY_SCHEDULING_DELAY = "READ_ENTRY_SCHEDULING_DELAY";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 76af6eb39..2b893876e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -120,7 +120,7 @@
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
-    static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
+    public static final long METAENTRY_ID_FORCE_LEDGER  = -0x4000;
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index edb89247d..2aebbb955 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -24,6 +24,8 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ADD_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CHANNEL_WRITE;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.FORCE_LEDGER_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
@@ -130,6 +132,8 @@
     private final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
+    final OpStatsLogger forceLedgerStats;
+    final OpStatsLogger forceLedgerRequestStats;
     final OpStatsLogger fenceReadRequestStats;
     final OpStatsLogger fenceReadEntryStats;
     final OpStatsLogger fenceReadWaitStats;
@@ -191,6 +195,8 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
         this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
         this.addRequestStats = statsLogger.getOpStatsLogger(ADD_ENTRY_REQUEST);
         this.readEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY);
+        this.forceLedgerStats = statsLogger.getOpStatsLogger(FORCE_LEDGER);
+        this.forceLedgerRequestStats = statsLogger.getOpStatsLogger(FORCE_LEDGER_REQUEST);
         this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
         this.fenceReadEntryStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_READ);
         this.fenceReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_FENCE_REQUEST);
@@ -258,6 +264,9 @@ public void processRequest(Object msg, Channel c) {
                 case READ_ENTRY:
                     processReadRequestV3(r, c);
                     break;
+                case FORCE_LEDGER:
+                    processForceLedgerRequestV3(r, c);
+                    break;
                 case AUTH:
                     LOG.info("Ignoring auth operation from client {}", c.remoteAddress());
                     BookkeeperProtocol.AuthMessage message = BookkeeperProtocol.AuthMessage
@@ -369,6 +378,40 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann
         }
     }
 
+    private void processForceLedgerRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        ForceLedgerProcessorV3 forceLedger = new ForceLedgerProcessorV3(r, c, this);
+
+        final OrderedExecutor threadPool;
+        if (RequestUtils.isHighPriority(r)) {
+            threadPool = highPriorityThreadPool;
+        } else {
+            threadPool = writeThreadPool;
+        }
+
+        if (null == threadPool) {
+            forceLedger.run();
+        } else {
+            try {
+                threadPool.executeOrdered(r.getForceLedgerRequest().getLedgerId(), forceLedger);
+            } catch (RejectedExecutionException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to process request to force ledger {}. Too many pending requests",
+                              r.getForceLedgerRequest().getLedgerId());
+                }
+                BookkeeperProtocol.ForceLedgerResponse.Builder forceLedgerResponse =
+                        BookkeeperProtocol.ForceLedgerResponse.newBuilder()
+                        .setLedgerId(r.getForceLedgerRequest().getLedgerId())
+                        .setStatus(BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS);
+                BookkeeperProtocol.Response.Builder response = BookkeeperProtocol.Response.newBuilder()
+                        .setHeader(forceLedger.getHeader())
+                        .setStatus(forceLedgerResponse.getStatus())
+                        .setForceLedgerResponse(forceLedgerResponse);
+                BookkeeperProtocol.Response resp = response.build();
+                forceLedger.sendResponse(forceLedgerResponse.getStatus(), resp, forceLedgerRequestStats);
+            }
+        }
+    }
+
     private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
         ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
new file mode 100644
index 000000000..0c8ef01fa
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3.java
@@ -0,0 +1,142 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class ForceLedgerProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+    private static final Logger logger = LoggerFactory.getLogger(ForceLedgerProcessorV3.class);
+
+    public ForceLedgerProcessorV3(Request request, Channel channel,
+                             BookieRequestProcessor requestProcessor) {
+        super(request, channel, requestProcessor);
+    }
+
+    // Returns null if there is no exception thrown
+    private ForceLedgerResponse getForceLedgerResponse() {
+        final long startTimeNanos = MathUtils.nowInNano();
+        ForceLedgerRequest forceLedgerRequest = request.getForceLedgerRequest();
+        long ledgerId = forceLedgerRequest.getLedgerId();
+
+        final ForceLedgerResponse.Builder forceLedgerResponse = ForceLedgerResponse.newBuilder().setLedgerId(ledgerId);
+
+        if (!isVersionCompatible()) {
+            forceLedgerResponse.setStatus(StatusCode.EBADVERSION);
+            return forceLedgerResponse.build();
+        }
+
+        BookkeeperInternalCallbacks.WriteCallback wcb =
+                (int rc, long ledgerId1, long entryId, BookieSocketAddress addr, Object ctx) -> {
+
+            checkArgument(entryId == Bookie.METAENTRY_ID_FORCE_LEDGER,
+                    "entryId must be METAENTRY_ID_FORCE_LEDGER but was {}", entryId);
+
+            checkArgument(ledgerId1 == ledgerId,
+                    "ledgerId must be {} but was {}", ledgerId, ledgerId1);
+
+            if (BookieProtocol.EOK == rc) {
+                requestProcessor.getForceLedgerStats()
+                        .registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                                TimeUnit.NANOSECONDS);
+            } else {
+                requestProcessor.getForceLedgerStats()
+                        .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                                TimeUnit.NANOSECONDS);
+            }
+
+            StatusCode status;
+            switch (rc) {
+                case BookieProtocol.EOK:
+                    status = StatusCode.EOK;
+                    break;
+                case BookieProtocol.EIO:
+                    status = StatusCode.EIO;
+                    break;
+                default:
+                    status = StatusCode.EUA;
+                    break;
+            }
+            forceLedgerResponse.setStatus(status);
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(forceLedgerResponse.getStatus())
+                    .setForceLedgerResponse(forceLedgerResponse);
+            Response resp = response.build();
+            sendResponse(status, resp, requestProcessor.getForceLedgerRequestStats());
+        };
+        StatusCode status = null;
+        try {
+            requestProcessor.getBookie().forceLedger(ledgerId, wcb, channel);
+            status = StatusCode.EOK;
+        } catch (Throwable t) {
+            logger.error("Unexpected exception while forcing ledger {} : ", ledgerId, t);
+            // some bad request which cause unexpected exception
+            status = StatusCode.EBADREQ;
+        }
+
+        // If everything is okay, we return null so that the calling function
+        // doesn't return a response back to the caller.
+        if (!status.equals(StatusCode.EOK)) {
+            forceLedgerResponse.setStatus(status);
+            return forceLedgerResponse.build();
+        }
+        return null;
+    }
+
+    @Override
+    public void safeRun() {
+        ForceLedgerResponse forceLedgerResponse = getForceLedgerResponse();
+        if (null != forceLedgerResponse) {
+            Response.Builder response = Response.newBuilder()
+                    .setHeader(getHeader())
+                    .setStatus(forceLedgerResponse.getStatus())
+                    .setForceLedgerResponse(forceLedgerResponse);
+            Response resp = response.build();
+            sendResponse(forceLedgerResponse.getStatus(), resp, requestProcessor.getForceLedgerRequestStats());
+        }
+    }
+
+    /**
+     * this toString method filters out body and masterKey from the output.
+     * masterKey contains the password of the ledger and body is customer data,
+     * so it is not appropriate to have these in logs or system output.
+     */
+    @Override
+    public String toString() {
+        return RequestUtils.toSafeString(request);
+    }
+}
+
+
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index fb6116529..d384c817a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -89,6 +89,11 @@ public static String toSafeString(BookkeeperProtocol.Request request) {
             stringHelper.add("ledgerId", writeLacRequest.getLedgerId());
             stringHelper.add("lac", writeLacRequest.getLac());
             return stringHelper.toString();
+        } else if (request.hasForceLedgerRequest()) {
+            BookkeeperProtocol.ForceLedgerRequest forceLedgerRequest = request.getForceLedgerRequest();
+            includeHeaderFields(stringHelper, header);
+            stringHelper.add("ledgerId", forceLedgerRequest.getLedgerId());
+            return stringHelper.toString();
         } else {
             return request.toString();
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
new file mode 100644
index 000000000..90a51c89e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ForceLedgerProcessorV3}.
+ */
+public class ForceLedgerProcessorV3Test {
+
+    private Request request;
+    private ForceLedgerProcessorV3 processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setTxnId(System.currentTimeMillis())
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setForceLedgerRequest(ForceLedgerRequest.newBuilder()
+                .setLedgerId(System.currentTimeMillis())
+                .build())
+            .build();
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getForceLedgerStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger"));
+        when(requestProcessor.getForceLedgerRequestStats())
+            .thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("force_ledger_request"));
+        processor = new ForceLedgerProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testForceLedger() throws Exception {
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(1);
+
+            wc.writeComplete(
+                0,
+                request.getForceLedgerRequest().getLedgerId(),
+                Bookie.METAENTRY_ID_FORCE_LEDGER,
+                null,
+                null);
+            return null;
+        }).when(bookie).forceLedger(
+            eq(request.getForceLedgerRequest().getLedgerId()),
+            any(WriteCallback.class),
+            same(channel));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .forceLedger(eq(request.getForceLedgerRequest().getLedgerId()),
+                    any(WriteCallback.class), same(channel));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services