You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/31 19:16:40 UTC
[bookkeeper] branch master updated: ISSUE #611: restrict max
ensemble change numbers
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 125cda6 ISSUE #611: restrict max ensemble change numbers
125cda6 is described below
commit 125cda66931f344a071167b9dd8e21d18fabb79a
Author: Arvin <ar...@gmail.com>
AuthorDate: Wed Jan 31 11:16:33 2018 -0800
ISSUE #611: restrict max ensemble change numbers
Descriptions of the changes in this PR:
Add max ensemble change number check
Master Issue: #611
Author: Arvin <ar...@gmail.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>
This closes #925 from ArvinDevel/restrict_ensemble_num, closes #611
---
.../org/apache/bookkeeper/client/BookKeeper.java | 9 +++
.../org/apache/bookkeeper/client/LedgerHandle.java | 17 +++-
.../bookkeeper/conf/ClientConfiguration.java | 21 +++++
.../bookkeeper/client/MockBookKeeperTestCase.java | 81 +++++++++++++++++--
.../client/TestMaxEnsembleChangeNum.java | 92 ++++++++++++++++++++++
.../client/api/BookKeeperBuildersTest.java | 7 +-
6 files changed, 214 insertions(+), 13 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 79dd625..ca24a99 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -709,6 +709,15 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
}
/**
+ * Get the disableEnsembleChangeFeature.
+ *
+ * @return disableEnsembleChangeFeature for the BookKeeper instance.
+ */
+ Feature getDisableEnsembleChangeFeature() {
+ return disableEnsembleChangeFeature;
+ }
+
+ /**
* Get the BookieClient, currently used for doing bookie recovery.
*
* @return BookieClient for the BookKeeper instance.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index ba6b3fc..dc100c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -21,6 +21,7 @@
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.client.api.BKException.Code.ClientClosedException;
+import static org.apache.bookkeeper.client.api.BKException.Code.WriteException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
@@ -120,6 +121,7 @@ public class LedgerHandle implements WriteHandle {
final AtomicInteger blockAddCompletions = new AtomicInteger(0);
final AtomicInteger numEnsembleChanges = new AtomicInteger(0);
+ final int maxAllowedEnsembleChanges;
Queue<PendingAddOp> pendingAddOps;
ExplicitLacFlushPolicy explicitLacFlushPolicy;
@@ -191,6 +193,7 @@ public class LedgerHandle implements WriteHandle {
}
};
+ maxAllowedEnsembleChanges = bk.getConf().getMaxAllowedEnsembleChanges();
ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES);
@@ -1520,7 +1523,7 @@ public class LedgerHandle implements WriteHandle {
continue;
}
try {
- BookieSocketAddress newBookie = bk.bookieWatcher.replaceBookie(
+ BookieSocketAddress newBookie = bk.getBookieWatcher().replaceBookie(
metadata.getEnsembleSize(),
metadata.getWriteQuorumSize(),
metadata.getAckQuorumSize(),
@@ -1552,8 +1555,7 @@ public class LedgerHandle implements WriteHandle {
void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
int curBlockAddCompletions = blockAddCompletions.incrementAndGet();
-
- if (bk.disableEnsembleChangeFeature.isAvailable()) {
+ if (bk.getDisableEnsembleChangeFeature().isAvailable()) {
blockAddCompletions.decrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.",
@@ -1565,6 +1567,15 @@ public class LedgerHandle implements WriteHandle {
int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet();
+ // when the ensemble changes are too frequent, close handle
+ if (curNumEnsembleChanges > maxAllowedEnsembleChanges){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ledger {} reaches max allowed ensemble change number {}",
+ ledgerId, maxAllowedEnsembleChanges);
+ }
+ handleUnrecoverableErrorDuringAdd(WriteException);
+ return;
+ }
synchronized (metadata) {
try {
EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies, curNumEnsembleChanges);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 424ae34..51aa699 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -116,6 +116,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
protected static final String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled";
// Add Parameters
protected static final String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange";
+ protected static final String MAX_ALLOWED_ENSEMBLE_CHANGES = "maxNumEnsembleChanges";
// Timeout Setting
protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec";
protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec";
@@ -1666,6 +1667,26 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
return this;
}
+ /**
+ * Get the max allowed ensemble change number.
+ *
+ * @return value of MaxAllowedEnsembleChanges, default MAX_VALUE, indicating feature is disable.
+ */
+ public int getMaxAllowedEnsembleChanges() {
+ return getInt(MAX_ALLOWED_ENSEMBLE_CHANGES, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Set the max allowed ensemble change number.
+ *
+ * @param num
+ * value of MaxAllowedEnsembleChanges
+ * @return client configuration.
+ */
+ public ClientConfiguration setMaxAllowedEnsembleChanges(int num) {
+ setProperty(MAX_ALLOWED_ENSEMBLE_CHANGES, num);
+ return this;
+ }
/**
* Option to use Netty Pooled ByteBufs.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 1ebfb8f..d9cecda 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -17,9 +17,13 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,7 +33,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -44,6 +51,7 @@ import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -82,6 +90,10 @@ public abstract class MockBookKeeperTestCase {
protected ConcurrentSkipListSet<Long> fencedLedgers;
protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData;
+ List<BookieSocketAddress> failedBookies;
+ Set<BookieSocketAddress> availableBookies;
+ private int lastIndexForBK;
+
private Map<BookieSocketAddress, Map<Long, MockEntry>> getMockLedgerContents(long ledgerId) {
return mockLedgerData.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>());
}
@@ -90,7 +102,11 @@ public abstract class MockBookKeeperTestCase {
return getMockLedgerContents(ledgerId).computeIfAbsent(bookieSocketAddress, addr -> new ConcurrentHashMap<>());
}
- private MockEntry getMockLedgerEntry(long ledgerId, BookieSocketAddress bookieSocketAddress, long entryId) {
+ private MockEntry getMockLedgerEntry(long ledgerId,
+ BookieSocketAddress bookieSocketAddress, long entryId) throws BKException{
+ if (failedBookies.contains(bookieSocketAddress)) {
+ throw BKException.create(NoBookieAvailableException);
+ }
return getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).get(entryId);
}
@@ -124,9 +140,13 @@ public abstract class MockBookKeeperTestCase {
NullStatsLogger nullStatsLogger = setupLoggers();
+ failedBookies = new ArrayList<>();
+ availableBookies = new HashSet<>();
+
when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock());
when(bk.isClosed()).thenReturn(false);
when(bk.getBookieWatcher()).thenReturn(bookieWatcher);
+ when(bk.getDisableEnsembleChangeFeature()).thenReturn(mock(Feature.class));
when(bk.getExplicitLacInterval()).thenReturn(0);
when(bk.getMainWorkerPool()).thenReturn(executor);
when(bk.getBookieClient()).thenReturn(bookieClient);
@@ -145,6 +165,7 @@ public abstract class MockBookKeeperTestCase {
setupRemoveLedgerMetadata();
setupRegisterLedgerMetadataListener();
setupBookieWatcherForNewEnsemble();
+ setupBookieWatcherForEnsembleChange();
setupBookieClientReadEntry();
setupBookieClientAddEntry();
}
@@ -188,6 +209,17 @@ public abstract class MockBookKeeperTestCase {
when(bk.isClosed()).thenReturn(true);
}
+ protected void killBookie(BookieSocketAddress killedBookieSocketAddress) {
+ failedBookies.add(killedBookieSocketAddress);
+ availableBookies.remove(killedBookieSocketAddress);
+ }
+
+ protected BookieSocketAddress startNewBookie() {
+ BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++);
+ availableBookies.add(address);
+ return address;
+ }
+
protected BookieSocketAddress generateBookieSocketAddress(int index) {
return new BookieSocketAddress("localhost", 1111 + index);
}
@@ -197,6 +229,8 @@ public abstract class MockBookKeeperTestCase {
for (int i = 0; i < ensembleSize; i++) {
ensemble.add(generateBookieSocketAddress(i));
}
+ availableBookies.addAll(ensemble);
+ lastIndexForBK = ensembleSize;
return ensemble;
}
@@ -213,6 +247,25 @@ public abstract class MockBookKeeperTestCase {
});
}
+ private void setupBookieWatcherForEnsembleChange() throws BKException.BKNotEnoughBookiesException {
+ when(bookieWatcher.replaceBookie(anyInt(), anyInt(), anyInt(), anyMap(), anyList(), anyInt(), anySet()))
+ .thenAnswer((Answer<BookieSocketAddress>) new Answer<BookieSocketAddress>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public BookieSocketAddress answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ List<BookieSocketAddress> existingBookies = (List<BookieSocketAddress>) args[4];
+ Set<BookieSocketAddress> excludeBookies = (Set<BookieSocketAddress>) args[6];
+ excludeBookies.addAll(existingBookies);
+ Set<BookieSocketAddress> remainBookies = new HashSet<BookieSocketAddress>(availableBookies);
+ remainBookies.removeAll(excludeBookies);
+ if (remainBookies.iterator().hasNext()) {
+ return remainBookies.iterator().next();
+ }
+ throw BKException.create(BKException.Code.NotEnoughBookiesException);
+ }
+ });
+ }
private void submit(Runnable operation) {
try {
scheduler.submit(operation);
@@ -343,7 +396,14 @@ public abstract class MockBookKeeperTestCase {
LOG.error("Initialize macManager fail", gse);
}
fencedLedgers.add(ledgerId);
- MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+ MockEntry mockEntry = null;
+ try {
+ mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+ } catch (BKException bke) {
+ LOG.info("readEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId,
+ bookieSocketAddress);
+ callback.readEntryComplete(bke.getCode(), ledgerId, entryId, null, args[5]);
+ }
if (mockEntry != null) {
LOG.info("readEntryAndFenceLedger - found mock entry {}@{} at {}", entryId, ledgerId,
bookieSocketAddress);
@@ -377,7 +437,14 @@ public abstract class MockBookKeeperTestCase {
} catch (GeneralSecurityException gse){
LOG.error("Initialize macManager fail", gse);
}
- MockEntry mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+ MockEntry mockEntry = null;
+ try {
+ mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId);
+ } catch (BKException bke) {
+ LOG.info("readEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId,
+ bookieSocketAddress);
+ callback.readEntryComplete(bke.getCode(), ledgerId, entryId, null, args[5]);
+ }
if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress);
ByteBuf entry = macManager.computeDigestAndPackageForSending(entryId,
@@ -441,9 +508,13 @@ public abstract class MockBookKeeperTestCase {
callback.writeComplete(BKException.Code.LedgerFencedException,
ledgerId, entryId, bookieSocketAddress, ctx);
} else {
+ if (failedBookies.contains(bookieSocketAddress)) {
+ callback.writeComplete(NoBookieAvailableException, ledgerId, entryId, bookieSocketAddress, ctx);
+ return;
+ }
if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) {
- registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
- new byte[0], BookieProtocol.INVALID_ENTRY_ID);
+ registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress,
+ new byte[0], BookieProtocol.INVALID_ENTRY_ID);
}
registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId);
callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
new file mode 100644
index 0000000..2a07e1d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxEnsembleChangeNum.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.apache.bookkeeper.client.api.BKException.Code.WriteException;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import org.apache.bookkeeper.client.api.WriteHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.junit.Test;
+
+/**
+ * Test ensemble change has a max num.
+ */
+public class TestMaxEnsembleChangeNum extends MockBookKeeperTestCase {
+
+ private static final byte[] password = new byte[5];
+ private static final byte[] data = new byte[5];
+
+ @Test
+ public void testChangeEnsembleMaxNumWithWriter() throws Exception {
+ long lId;
+ int numEntries = 5;
+ int changeNum = 5;
+ setBookkeeperConfig(new ClientConfiguration().setDelayEnsembleChange(false).setMaxAllowedEnsembleChanges(5));
+ try (WriteHandle writer = result(newCreateLedgerOp()
+ .withAckQuorumSize(3)
+ .withWriteQuorumSize(3)
+ .withEnsembleSize(3)
+ .withPassword(password)
+ .execute())) {
+ lId = writer.getId();
+ //first fragment
+ for (int i = 0; i < numEntries; i++) {
+ result(writer.append(ByteBuffer.wrap(data)));
+ }
+ assertEquals("There should be zero ensemble change",
+ 1, getLedgerMetadata(lId).getEnsembles().size());
+
+ simulateEnsembleChangeWithWriter(changeNum, numEntries, writer);
+
+ // one more ensemble change
+ startNewBookie();
+ killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
+ // add failure
+ try {
+ result(writer.append(ByteBuffer.wrap(data)));
+ fail("should not come to here");
+ } catch (BKException exception){
+ assertEquals(exception.getCode(), WriteException);
+ }
+ }
+ }
+
+ private void simulateEnsembleChangeWithWriter(int changeNum, int numEntries, WriteHandle writer) throws Exception{
+
+ int expectedSize = writer.getLedgerMetadata().getAllEnsembles().size() + 1;
+ //kill bookie and add again
+ for (int num = 0; num < changeNum; num++){
+ startNewBookie();
+
+ killBookie(writer.getLedgerMetadata().getEnsembleAt(writer.getLastAddConfirmed()).get(0));
+ for (int i = 0; i < numEntries; i++) {
+ result(writer.append(ByteBuffer.wrap(data)));
+ }
+ // ensure there is a ensemble changed
+ assertEquals("There should be one ensemble change",
+ expectedSize + num, writer.getLedgerMetadata().getAllEnsembles().size());
+ }
+ }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index f6b8b07..ddd9d4e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -25,11 +25,9 @@ import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.bookkeeper.client.BKException.BKClientClosedException;
import org.apache.bookkeeper.client.BKException.BKIncorrectParameterException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
@@ -39,7 +37,6 @@ import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.client.MockBookKeeperTestCase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol;
-
import org.junit.Test;
/**
@@ -340,8 +337,8 @@ public class BookKeeperBuildersTest extends MockBookKeeperTestCase {
ledgerMetadata.getEnsembles().values().forEach(bookieAddressList -> {
bookieAddressList.forEach(bookieAddress -> {
- registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
- registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
+ registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieAddress, entryData, -1);
+ registerMockEntryForRead(ledgerId, 0, bookieAddress, entryData, -1);
});
});
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.