You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/17 23:44:10 UTC
bookkeeper git commit: BOOKKEEPER-912: Allow EnsemblePlacementPolicy
to choose bookies using ledger custom data (multitenancy support)
Repository: bookkeeper
Updated Branches:
refs/heads/master 9359d682a -> 95ea4815b
BOOKKEEPER-912: Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)
Author: eolivelli <eo...@gmail.com>
Reviewers: sijie@apache.org <si...@apache.org>
Closes #68 from eolivelli/BOOKKEEPER-912 and squashes the following commits:
6c1eaca [eolivelli] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)
7c0ab37 [eolivelli] BOOKKEEPER-912 Allow EnsemblePlacementPolicy to choose bookies using ledger custom data (multitenancy support)
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/95ea4815
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/95ea4815
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/95ea4815
Branch: refs/heads/master
Commit: 95ea4815b5313ea2ab02e5192c9f577d1c0fe24e
Parents: 9359d68
Author: eolivelli <eo...@gmail.com>
Authored: Thu Nov 17 15:44:06 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Nov 17 15:44:06 2016 -0800
----------------------------------------------------------------------
.../LocalBookieEnsemblePlacementPolicy.java | 6 +-
.../apache/bookkeeper/client/BookieWatcher.java | 13 +-
.../client/DefaultEnsemblePlacementPolicy.java | 9 +-
.../client/EnsemblePlacementPolicy.java | 30 ++--
.../bookkeeper/client/LedgerCreateOp.java | 3 +-
.../apache/bookkeeper/client/LedgerHandle.java | 4 +-
.../RackawareEnsemblePlacementPolicy.java | 20 +--
.../RackawareEnsemblePlacementPolicyImpl.java | 8 +-
.../RegionAwareEnsemblePlacementPolicy.java | 6 +-
.../GenericEnsemblePlacementPolicyTest.java | 148 +++++++++++++++++++
.../TestRackawareEnsemblePlacementPolicy.java | 22 +--
.../TestRegionAwareEnsemblePlacementPolicy.java | 58 ++++----
12 files changed, 235 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 508511b..e7bfe94 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -74,7 +74,9 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
throw new BKNotEnoughBookiesException();
}
@@ -90,7 +92,7 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
@Override
public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
if (ensembleSize > 1) {
throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie");
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index b8d8951..cec6920 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -49,6 +49,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import java.util.Map;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -255,18 +256,19 @@ class BookieWatcher implements Watcher, ChildrenCallback {
* @return list of bookies for new ensemble.
* @throws BKNotEnoughBookiesException
*/
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize)
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata)
throws BKNotEnoughBookiesException {
try {
// we try to only get from the healthy bookies first
return placementPolicy.newEnsemble(ensembleSize,
- writeQuorumSize, ackQuorumSize, new HashSet<BookieSocketAddress>(
+ writeQuorumSize, ackQuorumSize, customMetadata, new HashSet<BookieSocketAddress>(
quarantinedBookies.asMap().keySet()));
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
- return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, EMPTY_SET);
+ return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, EMPTY_SET);
}
}
@@ -280,6 +282,7 @@ class BookieWatcher implements Watcher, ChildrenCallback {
* @throws BKNotEnoughBookiesException
*/
public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Map<String, byte[]> customMetadata,
List<BookieSocketAddress> existingBookies, int bookieIdx,
Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
@@ -288,13 +291,13 @@ class BookieWatcher implements Watcher, ChildrenCallback {
// we exclude the quarantined bookies also first
Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
- return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
existingAndQuarantinedBookies, addr, excludeBookies);
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
- return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 640bdb7..5a2c1f2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -45,8 +45,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
return newBookies;
@@ -70,11 +69,9 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble,
- BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
excludeBookies.addAll(currentEnsemble);
- ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, excludeBookies);
+ ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, customMetadata, excludeBookies);
return addresses.get(0);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 2af8108..4a0f307 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -80,28 +80,32 @@ public interface EnsemblePlacementPolicy {
* Ensemble Size
* @param writeQuorumSize
* Write Quorum Size
- * @param excludeBookies
- * Bookies that should not be considered as targets.
- * @return list of bookies chosen as targets.
+ * @param ackQuorumSize
+ * the value of ackQuorumSize
+ * @param customMetadata the value of customMetadata
+ * @param excludeBookies Bookies that should not be considered as targets.
* @throws BKNotEnoughBookiesException if not enough bookies available.
+ * @return the java.util.ArrayList<org.apache.bookkeeper.net.BookieSocketAddress>
*/
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
/**
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
* {@link BKNotEnoughBookiesException} is thrown.
*
- * @param bookieToReplace
- * bookie to replace
- * @param excludeBookies
- * bookies that should not be considered as candidate.
- * @return the bookie chosen as target.
+ * @param ensembleSize
+ * the value of ensembleSize
+ * @param writeQuorumSize
+ * the value of writeQuorumSize
+ * @param ackQuorumSize the value of ackQuorumSize
+ * @param customMetadata the value of customMetadata
+ * @param currentEnsemble the value of currentEnsemble
+ * @param bookieToReplace bookie to replace
+ * @param excludeBookies bookies that should not be considered as candidate.
* @throws BKNotEnoughBookiesException
+ * @return the org.apache.bookkeeper.net.BookieSocketAddress
*/
- public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
/**
* Reorder the read sequence of a given write quorum <i>writeSet</i>.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index e88df31..52a5cb6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -107,7 +107,8 @@ class LedgerCreateOp implements GenericCallback<Void> {
ensemble = bk.bookieWatcher
.newEnsemble(metadata.getEnsembleSize(),
metadata.getWriteQuorumSize(),
- metadata.getAckQuorumSize());
+ metadata.getAckQuorumSize(),
+ metadata.getCustomMetadata());
} catch (BKNotEnoughBookiesException e) {
LOG.error("Not enough bookies to create ledger");
createComplete(e.getCode(), null);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 5c33929..11212a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -995,7 +995,9 @@ public class LedgerHandle implements AutoCloseable {
newEnsemble.addAll(metadata.currentEnsemble);
newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(),
metadata.getWriteQuorumSize(),
- metadata.getAckQuorumSize(), newEnsemble,
+ metadata.getAckQuorumSize(),
+ metadata.getCustomMetadata(),
+ newEnsemble,
bookieIndex, new HashSet<>(Arrays.asList(addr)));
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index f42e42a..c306ca0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -79,39 +79,31 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
@Override
public ArrayList<BookieSocketAddress> newEnsemble(
- int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies)
+ int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
- return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+ return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
- return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+ return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, excludeBookies);
}
}
}
@Override
public BookieSocketAddress replaceBookie(
- int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize,
- Collection<BookieSocketAddress> currentEnsemble,
- BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies)
+ int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
throws BKException.BKNotEnoughBookiesException {
try {
- return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
currentEnsemble, bookieToReplace, excludeBookies);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
- return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,customMetadata,
currentEnsemble, bookieToReplace, excludeBookies);
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 3c41a7c..79ff0da 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -259,8 +259,7 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies)
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
}
@@ -355,10 +354,7 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Collection<BookieSocketAddress> currentEnsemble,
- BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies)
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
rwLock.readLock().lock();
try {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 181feca..265499c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -212,8 +212,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
@@ -392,8 +391,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
}
@Override
- public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
rwLock.readLock().lock();
try {
boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
new file mode 100644
index 0000000..e28a691
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/GenericEnsemblePlacementPolicyTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GenericEnsemblePlacementPolicyTest extends BookKeeperClusterTestCase {
+
+ private BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
+ private static final String PASSWORD = "testPasswd";
+ private static final String property = "foo";
+ private static final byte[] value = "bar".getBytes(StandardCharsets.UTF_8);
+ private static List<Map<String, byte[]>> customMetadataOnNewEnsembleStack = new ArrayList<>();
+ private static List<Map<String, byte[]>> customMetadataOnReplaceBookieStack = new ArrayList<>();
+
+ public GenericEnsemblePlacementPolicyTest() {
+ super(0);
+ baseClientConf.setEnsemblePlacementPolicy(CustomEnsemblePlacementPolicy.class);
+ }
+
+ public static final class CustomEnsemblePlacementPolicy extends DefaultEnsemblePlacementPolicy {
+
+ @Override
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ new Exception("replaceBookie " + ensembleSize + "," + customMetadata).printStackTrace();
+ assertNotNull(customMetadata);
+ customMetadataOnReplaceBookieStack.add(customMetadata);
+ return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
+ currentEnsemble, bookieToReplace, excludeBookies);
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+ int ackQuorumSize, Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ assertNotNull(customMetadata);
+ customMetadataOnNewEnsembleStack.add(customMetadata);
+ return super.newEnsemble(ensembleSize, quorumSize, ackQuorumSize, customMetadata, excludeBookies);
+ }
+
+ }
+
+ @Before
+ public void reset() {
+ customMetadataOnNewEnsembleStack.clear();
+ customMetadataOnReplaceBookieStack.clear();
+ }
+
+ @Test(timeout = 60000)
+ public void testNewEnsemble() throws Exception {
+ numBookies = 1;
+ startBKCluster();
+ try {
+ Map<String, byte[]> customMetadata = new HashMap<>();
+ customMetadata.put(property, value);
+ try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+ bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata);
+ }
+ assertEquals(1, customMetadataOnNewEnsembleStack.size());
+ assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+ } finally {
+ stopBKCluster();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testNewEnsembleWithNotEnoughtBookies() throws Exception {
+ numBookies = 0;
+ try {
+ startBKCluster();
+ Map<String, byte[]> customMetadata = new HashMap<>();
+ customMetadata.put(property, value);
+ try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+ bk.createLedger(1, 1, 1, digestType, PASSWORD.getBytes(), customMetadata);
+ fail("creation should fail");
+ } catch (BKException.BKNotEnoughBookiesException bneb) {
+ }
+ assertEquals(2, customMetadataOnNewEnsembleStack.size());
+ assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+ assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(1).get(property));
+ } finally {
+ stopBKCluster();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testReplaceBookie() throws Exception {
+ numBookies = 3;
+ startBKCluster();
+ try {
+ Map<String, byte[]> customMetadata = new HashMap<>();
+ customMetadata.put(property, value);
+ try (BookKeeper bk = new BookKeeper(baseClientConf, zkc);) {
+ try (LedgerHandle lh = bk.createLedger(2, 2, 2, digestType, PASSWORD.getBytes(), customMetadata);) {
+ lh.addEntry(value);
+ long lId = lh.getId();
+ ArrayList<BookieSocketAddress> ensembleAtFirstEntry = lh.getLedgerMetadata().getEnsemble(lId);
+ assertEquals(2, ensembleAtFirstEntry.size());
+ killBookie(ensembleAtFirstEntry.get(0));
+ lh.addEntry(value);
+ }
+ }
+ assertEquals(2, customMetadataOnNewEnsembleStack.size());
+ assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+ // replaceBookie by default calls newEnsemble, so newEnsemble gets called twice
+ assertArrayEquals(value, customMetadataOnNewEnsembleStack.get(0).get(property));
+
+ assertEquals(1, customMetadataOnReplaceBookieStack.size());
+ assertArrayEquals(value, customMetadataOnReplaceBookieStack.get(0).get(property));
+
+ } finally {
+ stopBKCluster();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 4f36902..bef6bc2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -240,7 +240,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
assertEquals(addr3, replacedBookie);
}
@@ -265,7 +265,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -295,7 +295,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -316,9 +316,9 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
@@ -344,10 +344,10 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
int numCovered = getNumCoveredWriteQuorums(ensemble, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
numCovered = getNumCoveredWriteQuorums(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -386,9 +386,9 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(3, getNumCoveredWriteQuorums(ensemble1, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(4, getNumCoveredWriteQuorums(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
@@ -489,12 +489,12 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
// we will never use addr4 even it is in the stabilized network topology
for (int i = 0 ; i < 5; i++) {
ArrayList<BookieSocketAddress> ensemble =
- repp.newEnsemble(3, 3, 3, new HashSet<BookieSocketAddress>());
+ repp.newEnsemble(3, 3, 3, null, new HashSet<BookieSocketAddress>());
assertFalse(ensemble.contains(addr4));
}
// we could still use addr4 for urgent allocation if it is just bookie flapping
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(4, 4, 4, null, new HashSet<BookieSocketAddress>());
assertTrue(ensemble.contains(addr4));
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/95ea4815/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index 9a32986..5c61ae3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -269,7 +269,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, new HashSet<BookieSocketAddress>());
assertEquals(addr3, replacedBookie);
}
@@ -294,7 +294,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
- BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
@@ -319,7 +319,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 3, null, new HashSet<BookieSocketAddress>());
LOG.info("Ensemble : {}", list);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -351,7 +351,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
- repp.replaceBookie(1, 1, 1, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
+ repp.replaceBookie(1, 1, 1, null, new HashSet<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
@@ -380,9 +380,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(0, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
@@ -411,7 +411,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble, 2);
assertTrue(numCovered >= 1);
assertTrue(numCovered < 3);
@@ -419,7 +419,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
fail("Should not get not enough bookies exception even there is only one rack.");
}
try {
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
int numCovered = getNumCoveredRegionsInWriteQuorum(ensemble2, 2);
assertTrue(numCovered >= 1 && numCovered < 3);
} catch (BKNotEnoughBookiesException bnebe) {
@@ -458,9 +458,9 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble1 = repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(3, getNumCoveredRegionsInWriteQuorum(ensemble1, 2));
- ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble2 = repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
assertEquals(4, getNumCoveredRegionsInWriteQuorum(ensemble2, 2));
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
@@ -507,22 +507,22 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
addrs.add(addr10);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(7, 7, 4, new HashSet<BookieSocketAddress>());
+ ensemble = repp.newEnsemble(7, 7, 4, null, new HashSet<BookieSocketAddress>());
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 7);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(8, 8, 5, new HashSet<BookieSocketAddress>());
+ ensemble = repp.newEnsemble(8, 8, 5, null, new HashSet<BookieSocketAddress>());
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 8);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
- ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+ ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
assert(ensemble.contains(addr4));
assert(ensemble.contains(addr8));
assert(ensemble.size() == 9);
@@ -575,7 +575,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
((SettableFeature) featureProvider.scope("region1").getFeature("disallowBookies")).set(true);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
assertEquals(2, getNumRegionsInEnsemble(ensemble));
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
@@ -589,14 +589,14 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
}
try {
((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(true);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
fail("Should get not enough bookies exception even there is only one region with insufficient bookies.");
} catch (BKNotEnoughBookiesException bnebe) {
// Expected
}
try {
((SettableFeature) featureProvider.scope("region2").getFeature("disallowBookies")).set(false);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
assert(ensemble.contains(addr1));
assert(ensemble.contains(addr3));
assert(ensemble.contains(addr4));
@@ -669,7 +669,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, new HashSet<BookieSocketAddress>());
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -680,7 +680,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
try{
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
excludedAddrs.add(addr10);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, excludedAddrs);
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(10, 10, 10, null, excludedAddrs);
assert(ensemble.contains(addr11) && ensemble.contains(addr12));
assert(ensemble.size() == 10);
assertEquals(5, getNumRegionsInEnsemble(ensemble));
@@ -771,7 +771,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
ArrayList<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, ackQuorum, new HashSet<BookieSocketAddress>());
+ ensemble = repp.newEnsemble(6, 6, ackQuorum, null, new HashSet<BookieSocketAddress>());
assert(ensemble.size() == 6);
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -792,7 +792,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
for(BookieSocketAddress addr: region2Bookies) {
if (ensemble.contains(addr)) {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, addr, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, addr, excludedAddrs);
ensemble.remove(addr);
ensemble.add(replacedBookie);
}
@@ -816,7 +816,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs);
assert (replacedBookie.equals(replacedBookieExpected));
assertEquals(3, getNumRegionsInEnsemble(ensemble));
} catch (BKNotEnoughBookiesException bnebe) {
@@ -825,7 +825,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
excludedAddrs.add(replacedBookieExpected);
try {
- BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, ensemble, bookieToReplace, excludedAddrs);
+ BookieSocketAddress replacedBookie = repp.replaceBookie(6, 6, ackQuorum, null, ensemble, bookieToReplace, excludedAddrs);
if (minDurability > 1 && !disableDurabilityFeature.isAvailable()) {
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
}
@@ -900,7 +900,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
ArrayList<BookieSocketAddress> ensemble;
try {
- ensemble = repp.newEnsemble(6, 6, 4, new HashSet<BookieSocketAddress>());
+ ensemble = repp.newEnsemble(6, 6, 4, null, new HashSet<BookieSocketAddress>());
assert(ensemble.size() == 6);
} catch (BKNotEnoughBookiesException bnebe) {
LOG.error("BKNotEnoughBookiesException", bnebe);
@@ -911,7 +911,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
try{
- repp.replaceBookie(6, 6, 4, ensemble, addr4, excludedAddrs);
+ repp.replaceBookie(6, 6, 4, null, ensemble, addr4, excludedAddrs);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
}
@@ -964,7 +964,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
excludedAddrs.add(addr10);
excludedAddrs.add(addr9);
try {
- ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, excludedAddrs);
+ ArrayList<BookieSocketAddress> list = repp.newEnsemble(5, 5, 5, null, excludedAddrs);
LOG.info("Ensemble : {}", list);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
@@ -1025,7 +1025,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
private void basicReorderReadSequenceWithLocalRegionTest(String myRegion, boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1076,7 +1076,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
private void basicReorderReadSequenceWithRemoteRegionTest(String myRegion, boolean isReadLAC) throws Exception {
prepareNetworkTopologyForReorderTests(myRegion);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);
@@ -1139,7 +1139,7 @@ public class TestRegionAwareEnsemblePlacementPolicy extends TestCase {
prepareNetworkTopologyForReorderTests(myRegion);
- ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, new HashSet<BookieSocketAddress>());
+ ArrayList<BookieSocketAddress> ensemble = repp.newEnsemble(9, 9, 5, null, new HashSet<BookieSocketAddress>());
assertEquals(9, getNumCoveredRegionsInWriteQuorum(ensemble, 9));
DistributionSchedule ds = new RoundRobinDistributionSchedule(9, 9, 9);