You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/31 19:16:46 UTC

[GitHub] sijie closed pull request #925: Issue 611: restrict max ensemble change numbers

sijie closed pull request #925: Issue 611: restrict max ensemble change numbers
URL: https://github.com/apache/bookkeeper/pull/925
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 79dd6254e..ca24a99bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -708,6 +708,15 @@ StatsLogger getStatsLogger() {
         return readLACSpeculativeRequestPolicy;
     }
 
+    /**
+     * Get the disableEnsembleChangeFeature.
+     *
+     * @return disableEnsembleChangeFeature for the BookKeeper instance.
+     */
+    Feature getDisableEnsembleChangeFeature() {
+        return disableEnsembleChangeFeature;
+    }
+
     /**
      * Get the BookieClient, currently used for doing bookie recovery.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index ba6b3fcf8..dc100c64e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.client;
 
 import static org.apache.bookkeeper.client.api.BKException.Code.ClientClosedException;
+import static org.apache.bookkeeper.client.api.BKException.Code.WriteException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -120,6 +121,7 @@
 
     final AtomicInteger blockAddCompletions = new AtomicInteger(0);
     final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
+    final int maxAllowedEnsembleChanges;
     Queue<PendingAddOp> pendingAddOps;
     ExplicitLacFlushPolicy explicitLacFlushPolicy;
 
@@ -191,6 +193,7 @@ public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
             }
         };
 
+        maxAllowedEnsembleChanges = bk.getConf().getMaxAllowedEnsembleChanges();
         ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
         lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
         lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
@@ -1520,7 +1523,7 @@ EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> fai
                     continue;
                 }
                 try {
-                    BookieSocketAddress newBookie = bk.bookieWatcher.replaceBookie(
+                    BookieSocketAddress newBookie = bk.getBookieWatcher().replaceBookie(
                         metadata.getEnsembleSize(),
                         metadata.getWriteQuorumSize(),
                         metadata.getAckQuorumSize(),
@@ -1552,8 +1555,7 @@ EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> fai
 
     void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
         int curBlockAddCompletions = blockAddCompletions.incrementAndGet();
-
-        if (bk.disableEnsembleChangeFeature.isAvailable()) {
+        if (bk.getDisableEnsembleChangeFeature().isAvailable()) {
             blockAddCompletions.decrementAndGet();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
@@ -1565,6 +1567,15 @@ void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies)
 
         int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
 
+        // when the ensemble changes are too frequent, close handle
+        if (curNumEnsembleChanges > maxAllowedEnsembleChanges){
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ledger {} reaches max allowed ensemble change number {}",
+                        ledgerId, maxAllowedEnsembleChanges);
+            }
+            handleUnrecoverableErrorDuringAdd(WriteException);
+            return;
+        }
         synchronized (metadata) {
             try {
                 EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 4d9c77478..e0dc360e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -79,6 +79,7 @@
     protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
     // Add Parameters
     protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
+    protected static final String MAX_ALLOWED_ENSEMBLE_CHANGES = "maxNumEnsembleChanges";
     // Timeout Setting
     protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
     protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
@@ -1618,6 +1619,26 @@ public ClientConfiguration setDisableEnsembleChangeFeatureName(String disableEns
         return this;
     }
 
+    /**
+     * Get the max allowed ensemble change number.
+     *
+     * @return value of MaxAllowedEnsembleChanges, default MAX_VALUE, indicating feature is disable.
+     */
+    public int getMaxAllowedEnsembleChanges() {
+        return getInt(MAX_ALLOWED_ENSEMBLE_CHANGES, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Set the max allowed ensemble change number.
+     *
+     * @param num
+     *          value of MaxAllowedEnsembleChanges
+     * @return client configuration.
+     */
+    public ClientConfiguration setMaxAllowedEnsembleChanges(int num) {
+        setProperty(MAX_ALLOWED_ENSEMBLE_CHANGES, num);
+        return this;
+    }
 
     /**
      * Option to use Netty Pooled ByteBufs.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 1ebfb8fa8..d9cecda9b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,9 +17,13 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -29,7 +33,10 @@
 import io.netty.buffer.Unpooled;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -44,6 +51,7 @@
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -82,6 +90,10 @@
     protected ConcurrentSkipListSet<Long> fencedLedgers;
     protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
 
+    List<BookieSocketAddress> failedBookies;
+    Set<BookieSocketAddress> availableBookies;
+    private int lastIndexForBK;
+
     private Map<BookieSocketAddress, Map<Long, MockEntry>> getMockLedgerContents(long ledgerId) {
         return mockLedgerData.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>());
     }
@@ -90,7 +102,11 @@
         return getMockLedgerContents(ledgerId).computeIfAbsent(bookieSocketAddress, addr -> new ConcurrentHashMap<>());
     }
 
-    private MockEntry getMockLedgerEntry(long ledgerId, BookieSocketAddress bookieSocketAddress, long entryId) {
+    private MockEntry getMockLedgerEntry(long ledgerId,
+                                         BookieSocketAddress bookieSocketAddress, long entryId) throws BKException{
+        if (failedBookies.contains(bookieSocketAddress)) {
+            throw BKException.create(NoBookieAvailableException);
+        }
         return getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).get(entryId);
     }
 
@@ -124,9 +140,13 @@ public void setup() throws Exception {
 
         NullStatsLogger nullStatsLogger = setupLoggers();
 
+        failedBookies = new ArrayList<>();
+        availableBookies = new HashSet<>();
+
         when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock());
         when(bk.isClosed()).thenReturn(false);
         when(bk.getBookieWatcher()).thenReturn(bookieWatcher);
+        when(bk.getDisableEnsembleChangeFeature()).thenReturn(mock(Feature.class));
         when(bk.getExplicitLacInterval()).thenReturn(0);
         when(bk.getMainWorkerPool()).thenReturn(executor);
         when(bk.getBookieClient()).thenReturn(bookieClient);
@@ -145,6 +165,7 @@ public void setup() throws Exception {
         setupRemoveLedgerMetadata();
         setupRegisterLedgerMetadataListener();
         setupBookieWatcherForNewEnsemble();
+        setupBookieWatcherForEnsembleChange();
         setupBookieClientReadEntry();
         setupBookieClientAddEntry();
     }
@@ -188,6 +209,17 @@ protected void closeBookkeeper() {
         when(bk.isClosed()).thenReturn(true);
     }
 
+    protected void killBookie(BookieSocketAddress killedBookieSocketAddress) {
+        failedBookies.add(killedBookieSocketAddress);
+        availableBookies.remove(killedBookieSocketAddress);
+    }
+
+    protected BookieSocketAddress startNewBookie() {
+        BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++);
+        availableBookies.add(address);
+        return address;
+    }
+
     protected BookieSocketAddress generateBookieSocketAddress(int index) {
         return new BookieSocketAddress("localhost", 1111 + index);
     }
@@ -197,6 +229,8 @@ protected BookieSocketAddress generateBookieSocketAddress(int index) {
         for (int i = 0; i < ensembleSize; i++) {
             ensemble.add(generateBookieSocketAddress(i));
         }
+        availableBookies.addAll(ensemble);
+        lastIndexForBK = ensembleSize;
         return ensemble;
     }
 
@@ -213,6 +247,25 @@ private void setupBookieWatcherForNewEnsemble() throws BKException.BKNotEnoughBo
             });
     }
 
+    private void setupBookieWatcherForEnsembleChange() throws BKException.BKNotEnoughBookiesException {
+        when(bookieWatcher.replaceBookie(anyInt(), anyInt(), anyInt(), anyMap(), anyList(), anyInt(), anySet()))
+                .thenAnswer((Answer<BookieSocketAddress>) new Answer<BookieSocketAddress>() {
+                    @Override
+                    @SuppressWarnings("unchecked")
+                    public BookieSocketAddress answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        List<BookieSocketAddress> existingBookies = (List<BookieSocketAddress>) args[4];
+                        Set<BookieSocketAddress> excludeBookies = (Set<BookieSocketAddress>) args[6];
+                        excludeBookies.addAll(existingBookies);
+                        Set<BookieSocketAddress> remainBookies = new HashSet<BookieSocketAddress>(availableBookies);
+                        remainBookies.removeAll(excludeBookies);
+                        if (remainBookies.iterator().hasNext()) {
+                            return remainBookies.iterator().next();
+                        }
+                        throw BKException.create(BKException.Code.NotEnoughBookiesException);
+                    }
+                });
+    }
     private void submit(Runnable operation) {
         try {
             scheduler.submit(operation);
@@ -343,7 +396,14 @@ protected void setupBookieClientReadEntry() {
                     LOG.error("Initialize macManager fail", gse);
                 }
                 fencedLedgers.add(ledgerId);
-                MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                MockEntry mockEntry = null;
+                try {
+                    mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                } catch (BKException bke) {
+                    LOG.info("readEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId,
+                            bookieSocketAddress);
+                    callback.readEntryComplete(bke.getCode(), ledgerId, entryId, null, args[5]);
+                }
                 if (mockEntry != null) {
                     LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", entryId, ledgerId,
                             bookieSocketAddress);
@@ -377,7 +437,14 @@ protected void setupBookieClientReadEntry() {
                 } catch (GeneralSecurityException gse){
                     LOG.error("Initialize macManager fail", gse);
                 }
-                MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                MockEntry mockEntry = null;
+                try {
+                    mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+                } catch (BKException bke) {
+                    LOG.info("readEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId,
+                            bookieSocketAddress);
+                    callback.readEntryComplete(bke.getCode(), ledgerId, entryId, null, args[5]);
+                }
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
                     ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId,
@@ -441,9 +508,13 @@ protected void setupBookieClientAddEntry() {
                     callback.writeComplete(BKException.Code.LedgerFencedException,
                         ledgerId, entryId, bookieSocketAddress, ctx);
                 } else {
+                    if (failedBookies.contains(bookieSocketAddress)) {
+                        callback.writeComplete(NoBookieAvailableException, ledgerId, entryId, bookieSocketAddress, ctx);
+                        return;
+                    }
                     if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
-                        registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
-                            new byte[0], BookieProtocol.INVALID_ENTRY_ID);
+                            registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
+                                    new byte[0], BookieProtocol.INVALID_ENTRY_ID);
                     }
                     registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
                     callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
new file mode 100644
index 000000000..2a07e1d63
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.client.api.BKException.Code.WriteException;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.junit.Test;
+
+/**
+ * Test ensemble change has a max num.
+ */
+public class TestMaxEnsembleChangeNum extends MockBookKeeperTestCase {
+
+    private static final byte[] password = new byte[5];
+    private static final byte[] data = new byte[5];
+
+    @Test
+    public void testChangeEnsembleMaxNumWithWriter() throws Exception {
+        long lId;
+        int numEntries = 5;
+        int changeNum = 5;
+        setBookkeeperConfig(new ClientConfiguration().setDelayEnsembleChange(false).setMaxAllowedEnsembleChanges(5));
+        try (WriteHandle writer = result(newCreateLedgerOp()
+                .withAckQuorumSize(3)
+                .withWriteQuorumSize(3)
+                .withEnsembleSize(3)
+                .withPassword(password)
+                .execute())) {
+            lId = writer.getId();
+            //first fragment
+            for (int i = 0; i < numEntries; i++) {
+                result(writer.append(ByteBuffer.wrap(data)));
+            }
+            assertEquals("There should be zero ensemble change",
+                    1, getLedgerMetadata(lId).getEnsembles().size());
+
+            simulateEnsembleChangeWithWriter(changeNum, numEntries, writer);
+
+            // one more ensemble change
+            startNewBookie();
+            killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
+            // add failure
+            try {
+                result(writer.append(ByteBuffer.wrap(data)));
+                fail("should not come to here");
+            } catch (BKException exception){
+                assertEquals(exception.getCode(), WriteException);
+            }
+        }
+    }
+
+    private void simulateEnsembleChangeWithWriter(int changeNum, int numEntries, WriteHandle writer) throws Exception{
+
+        int expectedSize = writer.getLedgerMetadata().getAllEnsembles().size() + 1;
+        //kill bookie and add again
+        for (int num = 0; num < changeNum; num++){
+            startNewBookie();
+
+            killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
+            for (int i = 0; i < numEntries; i++) {
+                result(writer.append(ByteBuffer.wrap(data)));
+            }
+            // ensure there is a ensemble changed
+            assertEquals("There should be one ensemble change",
+                    expectedSize + num, writer.getLedgerMetadata().getAllEnsembles().size());
+        }
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index f6b8b07aa..ddd9d4e98 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -25,11 +25,9 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.bookkeeper.client.BKException.BKClientClosedException;
 import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
@@ -39,7 +37,6 @@
 import org.apache.bookkeeper.client.MockBookKeeperTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.proto.BookieProtocol;
-
 import org.junit.Test;
 
 /**
@@ -340,8 +337,8 @@ public void testOpenLedgerNoRecovery() throws Exception {
 
         ledgerMetadata.getEnsembles().values().forEach(bookieAddressList -> {
             bookieAddressList.forEach(bookieAddress -> {
-                registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
-                registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
+                    registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+                    registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
             });
         });
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services