You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/01 06:35:01 UTC
[pulsar] branch master updated: [pulsar-broker] tenant based bookie
isolation (#3933)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ac44fe [pulsar-broker] tenant based bookie isolation (#3933)
5ac44fe is described below
commit 5ac44fe06852aa6dc445725efd5abffd81aa6d75
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri May 31 23:34:54 2019 -0700
[pulsar-broker] tenant based bookie isolation (#3933)
fix tests
add admin cli
fix test
---
.../bookkeeper/mledger/ManagedLedgerConfig.java | 45 ++++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 101 ++++++-
.../pulsar/broker/BookKeeperClientFactory.java | 8 +-
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 63 ++++-
.../pulsar/broker/ManagedLedgerClientFactory.java | 59 +++-
.../org/apache/pulsar/broker/PulsarService.java | 4 +
.../pulsar/broker/admin/impl/NamespacesBase.java | 92 +++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 24 ++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 24 ++
.../pulsar/broker/service/BrokerService.java | 16 ++
.../service/schema/BookkeeperSchemaStorage.java | 4 +-
.../apache/pulsar/compaction/CompactorTool.java | 3 +-
.../broker/MockedBookKeeperClientFactory.java | 7 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 7 +-
.../broker/service/BrokerBookieIsolationTest.java | 300 +++++++++++++++++++++
.../pulsar/compaction/CompactedTopicTest.java | 5 +-
.../apache/pulsar/compaction/CompactionTest.java | 3 +-
.../apache/pulsar/compaction/CompactorTest.java | 5 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 4 +
.../client/admin/internal/NamespacesImpl.java | 22 ++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 6 +
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 32 +++
.../pulsar/common/policies/data/LocalPolicies.java | 6 +-
.../zookeeper/ZkBookieRackAffinityMapping.java | 2 +
24 files changed, 797 insertions(+), 45 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 0e77aff..4af66eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -24,7 +24,10 @@ import com.google.common.annotations.Beta;
import com.google.common.base.Charsets;
import java.time.Clock;
import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
@@ -61,6 +64,8 @@ public class ManagedLedgerConfig {
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
private boolean unackedRangesOpenCacheSetEnabled = true;
+ private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
+ private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private Clock clock = Clock.systemUTC();
@@ -581,4 +586,44 @@ public class ManagedLedgerConfig {
this.addEntryTimeoutSeconds = addEntryTimeoutSeconds;
return this;
}
+
+ /**
+ * Managed-ledger can setup different custom EnsemblePlacementPolicy (eg: affinity to write ledgers to only setup of
+ * group of bookies).
+ *
+ * @return
+ */
+ public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPolicyClassName() {
+ return bookKeeperEnsemblePlacementPolicyClassName;
+ }
+
+ /**
+ * Returns EnsemblePlacementPolicy configured for the Managed-ledger.
+ *
+ * @param bookKeeperEnsemblePlacementPolicy
+ */
+ public void setBookKeeperEnsemblePlacementPolicyClassName(
+ Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName) {
+ this.bookKeeperEnsemblePlacementPolicyClassName = bookKeeperEnsemblePlacementPolicyClassName;
+ }
+
+ /**
+ * Returns properties required by configured bookKeeperEnsemblePlacementPolicy.
+ *
+ * @return
+ */
+ public Map<String, Object> getBookKeeperEnsemblePlacementPolicyProperties() {
+ return bookKeeperEnsemblePlacementPolicyProperties;
+ }
+
+ /**
+ * Managed-ledger can setup different custom EnsemblePlacementPolicy which needs
+ * bookKeeperEnsemblePlacementPolicy-Properties.
+ *
+ * @param bookKeeperEnsemblePlacementPolicyProperties
+ */
+ public void setBookKeeperEnsemblePlacementPolicyProperties(
+ Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties) {
+ this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 7399233..95ee6b4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
+import com.google.common.base.Objects;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -79,7 +81,7 @@ import org.slf4j.LoggerFactory;
public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetaStore store;
- private final BookKeeper bookKeeper;
+ private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
private final boolean isBookkeeperManaged;
private final ZooKeeper zookeeper;
private final ManagedLedgerFactoryConfig config;
@@ -116,20 +118,30 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
- this(new BookKeeper(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc,
- config);
+ this((policyConfig) -> {
+ try {
+ return new BookKeeper(bkClientConfiguration, zkc);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }, true /* isBookkeeperManaged */, zkc, config);
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
- this(bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
+ this((policyConfig) -> bookKeeper, zooKeeper, new ManagedLedgerFactoryConfig());
}
public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
throws Exception {
- this(bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+ this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
}
- private ManagedLedgerFactoryImpl(BookKeeper bookKeeper, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
+ public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
+ throws Exception {
+ this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config);
+ }
+
+ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
ManagedLedgerFactoryConfig config) throws Exception {
scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(config.getNumManagedLedgerSchedulerThreads())
@@ -142,7 +154,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
cacheEvictionExecutor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
- this.bookKeeper = bookKeeper;
+ this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.zookeeper = isBookkeeperManaged ? zooKeeper : null;
this.store = new MetaStoreImplZookeeper(zooKeeper, orderedExecutor);
@@ -283,7 +295,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
ledgers.computeIfAbsent(name, (mlName) -> {
// Create the managed ledger
CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
- final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this, bookKeeper, store, config, scheduledExecutor,
+ final ManagedLedgerImpl newledger = new ManagedLedgerImpl(this,
+ bookkeeperFactory.get(
+ new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+ config.getBookKeeperEnsemblePlacementPolicyProperties())),
+ store, config, scheduledExecutor,
orderedExecutor, name);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
@@ -344,8 +360,11 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx) {
checkArgument(startPosition instanceof PositionImpl);
- ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this, bookKeeper, store, config,
- scheduledExecutor, orderedExecutor, managedLedgerName);
+ ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+ bookkeeperFactory
+ .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+ config.getBookKeeperEnsemblePlacementPolicyProperties())),
+ store, config, scheduledExecutor, orderedExecutor, managedLedgerName);
roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition).thenAccept(roCursor -> {
callback.openReadOnlyCursorComplete(roCursor, ctx);
@@ -409,7 +428,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
if (isBookkeeperManaged) {
try {
- bookKeeper.close();
+ BookKeeper bkFactory = bookkeeperFactory.get();
+ if (bkFactory != null) {
+ bkFactory.close();
+ }
} catch (BKException e) {
throw new ManagedLedgerException(e);
}
@@ -580,8 +602,63 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
public BookKeeper getBookKeeper() {
- return bookKeeper;
+ return bookkeeperFactory.get();
}
+ /**
+ * Factory to create Bookkeeper-client for a given ensemblePlacementPolicy
+ *
+ */
+ public static interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
+ default BookKeeper get() {
+ return get(null);
+ }
+
+ /**
+ * Returns Bk-Client for a given ensemblePlacementPolicyMetadata. It returns default bK-client if
+ * ensemblePlacementPolicyMetadata is null.
+ *
+ * @param ensemblePlacementPolicyMetadata
+ * @return
+ */
+ BookKeeper get(EnsemblePlacementPolicyConfig ensemblePlacementPolicyMetadata);
+ }
+
+ public static class EnsemblePlacementPolicyConfig {
+ private final Class<? extends EnsemblePlacementPolicy> policyClass;
+ private final Map<String, Object> properties;
+
+ public EnsemblePlacementPolicyConfig(Class<? extends EnsemblePlacementPolicy> policyClass,
+ Map<String, Object> properties) {
+ super();
+ this.policyClass = policyClass;
+ this.properties = properties;
+ }
+
+ public Class<? extends EnsemblePlacementPolicy> getPolicyClass() {
+ return policyClass;
+ }
+
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(policyClass != null ? policyClass.getName() : "", properties);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof EnsemblePlacementPolicyConfig) {
+ EnsemblePlacementPolicyConfig other = (EnsemblePlacementPolicyConfig) obj;
+ return Objects.equal(this.policyClass == null ? null : this.policyClass.getName(),
+ other.policyClass == null ? null : other.policyClass.getName())
+ && Objects.equal(this.properties, other.properties);
+ }
+ return false;
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerFactoryImpl.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index 9f91fcf..c0c43ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.broker;
import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.zookeeper.ZooKeeper;
@@ -28,6 +31,9 @@ import org.apache.zookeeper.ZooKeeper;
* Provider of a new BookKeeper client instance
*/
public interface BookKeeperClientFactory {
- BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException;
+ BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
+
void close();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index cc1a8e5..2105af5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -19,31 +19,55 @@
package org.apache.pulsar.broker;
import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+@SuppressWarnings("deprecation")
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
+ private final AtomicReference<ZooKeeperCache> zkCache = new AtomicReference<>();
- @SuppressWarnings("deprecation")
@Override
- public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
- ClientConfiguration bkConf = new ClientConfiguration();
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
+ ClientConfiguration bkConf = createBkClientConfiguration(conf);
+ if (properties != null) {
+ properties.forEach((key, value) -> bkConf.setProperty(key, value));
+ }
+ if (ensemblePlacementPolicyClass.isPresent()) {
+ setEnsemblePlacementPolicy(bkConf, conf, zkClient, ensemblePlacementPolicyClass.get());
+ } else {
+ setDefaultEnsemblePlacementPolicy(bkConf, conf, zkClient);
+ }
+ try {
+ return BookKeeper.forConfig(bkConf)
+ .allocator(PulsarByteBufAllocator.DEFAULT)
+ .zk(zkClient)
+ .build();
+ } catch (InterruptedException | BKException e) {
+ throw new IOException(e);
+ }
+ }
+ private ClientConfiguration createBkClientConfiguration(ServiceConfiguration conf) {
+ ClientConfiguration bkConf = new ClientConfiguration();
if (conf.getBookkeeperClientAuthenticationPlugin() != null
&& conf.getBookkeeperClientAuthenticationPlugin().trim().length() > 0) {
bkConf.setClientAuthProviderFactoryClass(conf.getBookkeeperClientAuthenticationPlugin());
@@ -69,6 +93,13 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
TimeUnit.SECONDS);
}
+ bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
+
+ return bkConf;
+ }
+
+ private void setDefaultEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf,
+ ZooKeeper zkClient) {
if (conf.isBookkeeperClientRackawarePolicyEnabled() || conf.isBookkeeperClientRegionawarePolicyEnabled()) {
if (conf.isBookkeeperClientRegionawarePolicyEnabled()) {
bkConf.setEnsemblePlacementPolicy(RegionAwareEnsemblePlacementPolicy.class);
@@ -86,7 +117,6 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
}
- bkConf.setReorderReadSequenceEnabled(conf.isBookkeeperClientReorderReadSequenceEnabled());
if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
@@ -106,14 +136,18 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache.get());
}
}
+ }
- try {
- return BookKeeper.forConfig(bkConf)
- .allocator(PulsarByteBufAllocator.DEFAULT)
- .zk(zkClient)
- .build();
- } catch (InterruptedException | BKException e) {
- throw new IOException(e);
+ private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, ZooKeeper zkClient,
+ Class<? extends EnsemblePlacementPolicy> policyClass) {
+ bkConf.setEnsemblePlacementPolicy(policyClass);
+ if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
+ };
+ if (!zkCache.compareAndSet(null, zkc)) {
+ zkc.stop();
+ }
+ bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.zkCache.get());
}
}
@@ -124,5 +158,8 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
if (this.clientIsolationZkCache.get() != null) {
this.clientIsolationZkCache.get().stop();
}
+ if (this.zkCache.get() != null) {
+ this.zkCache.get().stop();
+ }
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 1f401d5..bb286ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -20,28 +20,34 @@ package org.apache.pulsar.broker;
import java.io.Closeable;
import java.io.IOException;
-
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
+
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
public class ManagedLedgerClientFactory implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
private final ManagedLedgerFactory managedLedgerFactory;
- private final BookKeeper bkClient;
+ private final BookKeeper defaultBkClient;
+ private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider) throws Exception {
- this.bkClient = bookkeeperProvider.create(conf, zkClient);
-
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -52,7 +58,30 @@ public class ManagedLedgerClientFactory implements Closeable {
managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
- this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, zkClient, managedLedgerFactoryConfig);
+ this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
+
+ BookkeeperFactoryForCustomEnsemblePlacementPolicy bkFactory = (
+ EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) -> {
+ BookKeeper bkClient = null;
+ // find or create bk-client in cache for a specific ensemblePlacementPolicy
+ if (ensemblePlacementPolicyConfig != null && ensemblePlacementPolicyConfig.getPolicyClass() != null) {
+ bkClient = bkEnsemblePolicyToBkClientMap.computeIfAbsent(ensemblePlacementPolicyConfig, (key) -> {
+ try {
+ return bookkeeperProvider.create(conf, zkClient,
+ Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()),
+ ensemblePlacementPolicyConfig.getProperties());
+ } catch (Exception e) {
+ log.error("Failed to initialize bk-client for policy {}, properties {}",
+ ensemblePlacementPolicyConfig.getPolicyClass(),
+ ensemblePlacementPolicyConfig.getProperties(), e);
+ }
+ return this.defaultBkClient;
+ });
+ }
+ return bkClient != null ? bkClient : defaultBkClient;
+ };
+
+ this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig);
}
public ManagedLedgerFactory getManagedLedgerFactory() {
@@ -60,7 +89,12 @@ public class ManagedLedgerClientFactory implements Closeable {
}
public BookKeeper getBookKeeperClient() {
- return bkClient;
+ return defaultBkClient;
+ }
+
+ @VisibleForTesting
+ public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
+ return bkEnsemblePolicyToBkClientMap;
}
public void close() throws IOException {
@@ -69,7 +103,7 @@ public class ManagedLedgerClientFactory implements Closeable {
log.info("Closed managed ledger factory");
try {
- bkClient.close();
+ defaultBkClient.close();
} catch (RejectedExecutionException ree) {
// when closing bookkeeper client, it will error outs all pending metadata operations.
// those callbacks of those operations will be triggered, and submitted to the scheduler
@@ -80,6 +114,17 @@ public class ManagedLedgerClientFactory implements Closeable {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper client", ree);
}
+ if (bkEnsemblePolicyToBkClientMap != null) {
+ bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
+ try {
+ if (bk != null) {
+ bk.close();
+ }
+ } catch (Exception e) {
+ log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
+ }
+ });
+ }
log.info("Closed BookKeeper client");
} catch (Exception e) {
log.warn(e.getMessage(), e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 934ce10..098d52d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -691,6 +691,10 @@ public class PulsarService implements AutoCloseable {
return managedLedgerClientFactory.getManagedLedgerFactory();
}
+ public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
+ return managedLedgerClientFactory;
+ }
+
public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f587735..48a3c5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.admin.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -72,6 +75,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -79,6 +83,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -577,6 +582,93 @@ public abstract class NamespacesBase extends AdminResource {
log.info("[{}] Successfully unloaded all the bundles in namespace {}", clientAppId(), namespaceName);
}
+
+ protected void internalSetBookieAffinityGroup(String bookieAffinityGroup) {
+ log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
+ this.namespaceName);
+
+ validateSuperUserAccess();
+
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+ validateGlobalNamespaceOwnership(namespaceName);
+ } else {
+ validateClusterOwnership(namespaceName.getCluster());
+ validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
+ }
+
+ try {
+ String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
+ Stat nodeStat = new Stat();
+
+ LocalPolicies localPolicies = null;
+ int version = -1;
+ try {
+ byte[] content = pulsar().getLocalZkCache().getZooKeeper().getData(path, false, nodeStat);
+ localPolicies = jsonMapper().readValue(content, LocalPolicies.class);
+ version = nodeStat.getVersion();
+ } catch (KeeperException.NoNodeException e) {
+ log.info("local-policies for {} is not setup at path {}", this.namespaceName, path);
+ // if policies is not present into localZk then create new policies
+ this.pulsar().getLocalZkCacheService().createPolicies(path, false)
+ .get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ localPolicies = new LocalPolicies();
+ }
+ localPolicies.bookkeeperAffinityGroup = bookieAffinityGroup;
+ byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
+ pulsar().getLocalZkCache().getZooKeeper().setData(path, data, Math.toIntExact(version));
+ // invalidate namespace's local-policies
+ pulsar().getLocalZkCacheService().policiesCache().invalidate(path);
+ log.info("[{}] Successfully updated local-policies configuration: namespace={}, map={}", clientAppId(),
+ namespaceName, jsonMapper().writeValueAsString(localPolicies));
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to update persistence configuration for namespace {}: concurrent modification",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (Exception e) {
+ log.error("[{}] Failed to update local-policy configuration for namespace {}", clientAppId(), namespaceName,
+ e);
+ throw new RestException(e);
+ }
+ }
+
+ protected String internalGetBookieAffinityGroup() {
+ validateSuperUserAccess();
+
+ if (namespaceName.isGlobal()) {
+ // check cluster ownership for a given global namespace: redirect if peer-cluster owns it
+ validateGlobalNamespaceOwnership(namespaceName);
+ } else {
+ validateClusterOwnership(namespaceName.getCluster());
+ validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
+ }
+
+ String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
+ try {
+ Optional<LocalPolicies> policies = pulsar().getLocalZkCacheService().policiesCache().get(path);
+ final String bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ "Namespace local-policies does not exist")).bookkeeperAffinityGroup;
+ if (StringUtils.isBlank(bookkeeperAffinityGroup)) {
+ throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist");
+ }
+ return bookkeeperAffinityGroup;
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update local-policy configuration for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace policies does not exist");
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get local-policy configuration for namespace {} at path {}", clientAppId(),
+ namespaceName, path, e);
+ throw new RestException(e);
+ }
+ }
+
@SuppressWarnings("deprecation")
public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritative) {
log.info("[{}] Unloading namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 35b5713..4dfa6ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -551,6 +551,30 @@ public class Namespaces extends NamespacesBase {
internalSetPersistence(persistence);
}
+ @POST
+ @Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
+ @ApiOperation(hidden = true, value = "Set the bookie-affinity-group to namespace-local policy.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void setBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, @PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
+ validateNamespaceName(property, cluster, namespace);
+ internalSetBookieAffinityGroup(bookieAffinityGroup);
+ }
+
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity")
+ @ApiOperation(hidden = true, value = "Get the bookie-affinity-group from namespace-local policy.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public String getBookieAffinityGroup(@PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, cluster, namespace);
+ return internalGetBookieAffinityGroup();
+ }
+
@GET
@Path("/{property}/{cluster}/{namespace}/persistence")
@ApiOperation(hidden = true, value = "Get the persistence configuration for a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b0715ff..d6b9042 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -491,6 +491,30 @@ public class Namespaces extends NamespacesBase {
internalSetPersistence(persistence);
}
+ @POST
+ @Path("/{tenant}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
+ @ApiOperation(value = "Set the bookie-affinity-group to namespace-persistent policy.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ @PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
+ validateNamespaceName(tenant, namespace);
+ internalSetBookieAffinityGroup(bookieAffinityGroup);
+ }
+
+ @GET
+ @Path("/{property}/{namespace}/persistence/bookieAffinity")
+ @ApiOperation(hidden = true, value = "Get the bookie-affinity-group from namespace-local policy.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public String getBookieAffinityGroup(@PathParam("property") String property,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(property, namespace);
+ return internalGetBookieAffinityGroup();
+ }
+
@GET
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Get the persistence configuration for a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8ceaee8..1254088 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,8 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -109,6 +111,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
@@ -122,12 +125,14 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -712,10 +717,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
// Get persistence policy for this topic
Optional<Policies> policies = Optional.empty();
+ Optional<LocalPolicies> localPolicies = Optional.empty();
try {
policies = pulsar
.getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+ localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
@@ -738,6 +746,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() && StringUtils.isNotBlank(localPolicies.get().bookkeeperAffinityGroup)) {
+ managedLedgerConfig
+ .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookkeeperAffinityGroup);
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b7a531b..56fca20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -101,7 +101,9 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar.getConfiguration(),
- pulsar.getZkClient()
+ pulsar.getZkClient(),
+ Optional.empty(),
+ null
);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 0816625..f003b04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -26,6 +26,7 @@ import com.beust.jcommander.Parameter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.nio.file.Paths;
+import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -106,7 +107,7 @@ public class CompactorTool {
ZooKeeperClientFactory.SessionType.ReadWrite,
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new BookKeeperClientFactoryImpl();
- BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
+ BookKeeper bk = bkClientFactory.create(brokerConfig, zk, Optional.empty(), null);
try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 728bc67..4b7f49b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -22,10 +22,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.zookeeper.ZooKeeper;
@@ -51,7 +54,9 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
}
@Override
- public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) throws IOException {
return mockedBk;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index b987503..341b96a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -24,12 +24,12 @@ import static org.mockito.Mockito.spy;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -39,6 +39,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
@@ -274,7 +275,9 @@ public abstract class MockedPulsarServiceBaseTest {
private BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
@Override
- public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) {
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) {
// Always return the same instance (so that we don't loose the mock BK content on broker restart
return mockBookKeeper;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
new file mode 100644
index 0000000..5fe3f1d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class BrokerBookieIsolationTest {
+
+ private LocalBookkeeperEnsemble bkEnsemble;
+ private PulsarService pulsarService;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+ private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
+
+ private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
+
+ @BeforeMethod
+ protected void setup() throws Exception {
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(4, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+ bkEnsemble.start();
+ }
+
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ if (pulsarService != null) {
+ pulsarService.close();
+ }
+ bkEnsemble.stop();
+ }
+
+ /**
+ * Validate that broker can support tenant based bookie isolation.
+ *
+ * <pre>
+ * 1. create two bookie-info group : default-group and isolated-group
+ * 2. namespace ns1 : uses default-group
+ * validate: bookie-ensemble for ns1-topics's ledger will be from default-group
+ * 3. namespace ns2,ns3,ns4: uses isolated-group
+ * validate: bookie-ensemble for above namespace-topics's ledger will be from isolated-group
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBookieIsolation() throws Exception {
+
+ final String tenant1 = "tenant1";
+ final String cluster = "use";
+ final String ns1 = String.format("%s/%s/%s", tenant1, cluster, "ns1");
+ final String ns2 = String.format("%s/%s/%s", tenant1, cluster, "ns2");
+ final String ns3 = String.format("%s/%s/%s", tenant1, cluster, "ns3");
+ final String ns4 = String.format("%s/%s/%s", tenant1, cluster, "ns4");
+ final int totalPublish = 100;
+
+ final String brokerBookkeeperClientIsolationGroups = "default-group";
+ final String tenantNamespaceIsolationGroups = "tenant1-isolation";
+
+ BookieServer[] bookies = bkEnsemble.getBookies();
+ ZooKeeper zkClient = bkEnsemble.getZkClient();
+
+ Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
+ bookies[1].getLocalAddress());
+ Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
+ bookies[3].getLocalAddress());
+
+ setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
+ setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, isolatedBookies);
+
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ config.setClusterName(cluster);
+ config.setWebServicePort(Optional.of(PRIMARY_BROKER_WEBSERVICE_PORT));
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(Optional.of(PRIMARY_BROKER_PORT));
+ config.setAdvertisedAddress("localhost");
+ config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
+
+ config.setManagedLedgerDefaultEnsembleSize(2);
+ config.setManagedLedgerDefaultWriteQuorum(2);
+ config.setManagedLedgerDefaultAckQuorum(2);
+
+ int totalEntriesPerLedger = 20;
+ int totalLedgers = totalPublish / totalEntriesPerLedger;
+ config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
+ config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ pulsarService = new PulsarService(config);
+ pulsarService.start();
+
+ URL brokerUrl = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build();
+
+ ClusterData clusterData = new ClusterData(pulsarService.getWebServiceAddress());
+ admin.clusters().createCluster(cluster, clusterData);
+ TenantInfo tenantInfo = new TenantInfo(null, Sets.newHashSet(cluster));
+ admin.tenants().createTenant(tenant1, tenantInfo);
+ admin.namespaces().createNamespace(ns1);
+ admin.namespaces().createNamespace(ns2);
+ admin.namespaces().createNamespace(ns3);
+ admin.namespaces().createNamespace(ns4);
+ admin.namespaces().setBookieAffinityGroup(ns2, tenantNamespaceIsolationGroups);
+ admin.namespaces().setBookieAffinityGroup(ns3, tenantNamespaceIsolationGroups);
+ admin.namespaces().setBookieAffinityGroup(ns4, tenantNamespaceIsolationGroups);
+
+ assertEquals(admin.namespaces().getBookieAffinityGroup(ns2), tenantNamespaceIsolationGroups);
+ assertEquals(admin.namespaces().getBookieAffinityGroup(ns3), tenantNamespaceIsolationGroups);
+ assertEquals(admin.namespaces().getBookieAffinityGroup(ns4), tenantNamespaceIsolationGroups);
+
+ try {
+ admin.namespaces().getBookieAffinityGroup(ns1);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // Ok
+ }
+
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl.toString())
+ .statsInterval(-1, TimeUnit.SECONDS).build();
+
+ PersistentTopic topic1 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns1, "topic1", totalPublish);
+ PersistentTopic topic2 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns2, "topic1", totalPublish);
+ PersistentTopic topic3 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns3, "topic1", totalPublish);
+ PersistentTopic topic4 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns4, "topic1", totalPublish);
+
+ Bookie bookie1 = bookies[0].getBookie();
+ Field ledgerManagerField = Bookie.class.getDeclaredField("ledgerManager");
+ ledgerManagerField.setAccessible(true);
+ LedgerManager ledgerManager = (LedgerManager) ledgerManagerField.get(bookie1);
+
+ // namespace: ns1
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger();
+ assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ // validate ledgers' ensemble with affinity bookies
+ assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies);
+
+ // namespace: ns2
+ ml = (ManagedLedgerImpl) topic2.getManagedLedger();
+ assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ // validate ledgers' ensemble with affinity bookies
+ assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+ // namespace: ns3
+ ml = (ManagedLedgerImpl) topic3.getManagedLedger();
+ assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ // validate ledgers' ensemble with affinity bookies
+ assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+ // namespace: ns4
+ ml = (ManagedLedgerImpl) topic4.getManagedLedger();
+ assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers);
+ // validate ledgers' ensemble with affinity bookies
+ assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies);
+
+ ManagedLedgerClientFactory mlFactory = pulsarService.getManagedLedgerClientFactory();
+ Map<EnsemblePlacementPolicyConfig, BookKeeper> bkPlacementPolicyToBkClientMap = mlFactory
+ .getBkEnsemblePolicyToBookKeeperMap();
+
+ // broker should create only 1 bk-client and factory per isolation-group
+ assertEquals(bkPlacementPolicyToBkClientMap.size(), 1);
+ Class<? extends EnsemblePlacementPolicy> clazz = bkPlacementPolicyToBkClientMap.keySet().iterator().next()
+ .getPolicyClass();
+ System.out.println(clazz);
+
+ }
+
+ private void assertAffinityBookies(LedgerManager ledgerManager, List<LedgerInfo> ledgers1,
+ Set<BookieSocketAddress> defaultBookies) throws Exception {
+ for (LedgerInfo lInfo : ledgers1) {
+ long ledgerId = lInfo.getLedgerId();
+ CompletableFuture<Versioned<LedgerMetadata>> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId);
+ LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue();
+ Set<BookieSocketAddress> ledgerBookies = Sets.newHashSet();
+ ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next());
+ assertEquals(ledgerBookies.size(), defaultBookies.size());
+ ledgerBookies.removeAll(defaultBookies);
+ assertEquals(ledgerBookies.size(), 0);
+ }
+ }
+
+ private Topic createTopicAndPublish(PulsarClient pulsarClient, String ns, String topicLocalName, int totalPublish)
+ throws Exception {
+ final String topicName = String.format("persistent://%s/%s", ns, topicLocalName);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+ .subscribe();
+ consumer.close();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);
+
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < totalPublish; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+ producer.close();
+
+ return pulsarService.getBrokerService().getTopicReference(topicName).get();
+ }
+
+ private void setDefaultIsolationGroup(String brokerBookkeeperClientIsolationGroups, ZooKeeper zkClient,
+ Set<BookieSocketAddress> bookieAddresses) throws Exception {
+ BookiesRackConfiguration bookies = null;
+ try {
+ byte[] data = zkClient.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
+ System.out.println(new String(data));
+ bookies = jsonMapper.readValue(data, BookiesRackConfiguration.class);
+ } catch (KeeperException.NoNodeException e) {
+ // Ok.. create new bookie znode
+ zkClient.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "".getBytes(), Acl,
+ CreateMode.PERSISTENT);
+ }
+ if (bookies == null) {
+ bookies = new BookiesRackConfiguration();
+ }
+
+ Map<String, BookieInfo> bookieInfoMap = Maps.newHashMap();
+ for (BookieSocketAddress bkSocket : bookieAddresses) {
+ BookieInfo info = new BookieInfo("use", bkSocket.getHostName() + ":" + bkSocket.getPort());
+ bookieInfoMap.put(bkSocket.toString(), info);
+ }
+ bookies.put(brokerBookkeeperClientIsolationGroups, bookieInfoMap);
+
+ zkClient.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookies), -1);
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(BrokerBookieIsolationTest.class);
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 444d3e4..504de2a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -27,6 +27,7 @@ import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -136,7 +137,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
@Test
public void testEntryLookup() throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null);
+ this.conf, null, Optional.empty(), null);
Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> compactedLedgerData
= buildCompactedLedger(bk, 500);
@@ -192,7 +193,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
@Test
public void testCleanupOldCompactedTopicLedger() throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null);
+ this.conf, null, Optional.empty(), null);
LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 1928a32..1e68f80 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -81,7 +82,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
- bk = pulsar.getBookKeeperClientFactory().create(this.conf, null);
+ bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, Optional.empty(), null);
}
@AfterMethod
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 32765ba..bddc2bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -31,6 +31,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -81,7 +82,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null);
+ this.conf, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
long compactedLedgerId = compactor.compact(topic).get();
@@ -214,7 +215,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null);
+ this.conf, null, Optional.empty(), null);
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topic).get();
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 29f509b..098a8f8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -761,6 +761,10 @@ public interface Namespaces {
*/
PersistencePolicies getPersistence(String namespace) throws PulsarAdminException;
+ void setBookieAffinityGroup(String namespace, String bookieAffinityGroup) throws PulsarAdminException;
+
+ String getBookieAffinityGroup(String namespace) throws PulsarAdminException;
+
/**
* Set the retention configuration for all the topics on a namespace.
* <p/>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 42cdfbd..a577e53 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -404,6 +404,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void setBookieAffinityGroup(String namespace, String bookieAffinityGroup) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence", "bookieAffinity", bookieAffinityGroup);
+ request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public String getBookieAffinityGroup(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
+ return request(path).get(String.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public PersistencePolicies getPersistence(String namespace) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 025a818..4721929 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -280,6 +280,12 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-clusters myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1");
+ namespaces.run(split("set-bookie-affinity-group myprop/clust/ns1 --group test"));
+ verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1", "test");
+
+ namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
+ verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");
+
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7b7043f..359f25b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -421,6 +421,35 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Set the bookie-affinity group name")
+ private class SetBookieAffinityGroup extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--group",
+ "-g" }, description = "Bookie-affinity group name where namespace messages should be written", required = true)
+ private String bookieAffinityGroupName;
+
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupName);
+ }
+ }
+
+ @Parameters(commandDescription = "Get the bookie-affinity group name")
+ private class GetBookieAffinityGroup extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getBookieAffinityGroup(namespace));
+ }
+ }
+
@Parameters(commandDescription = "Get message TTL for a namespace")
private class GetMessageTTL extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -1125,6 +1154,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
+
+ jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
+ jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
jcommander.addCommand("unload", new Unload());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index d664d73..dc8b6b1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
public class LocalPolicies {
public BundlesData bundles;
+ public String bookkeeperAffinityGroup;
public LocalPolicies() {
bundles = defaultBundle();
@@ -32,14 +33,15 @@ public class LocalPolicies {
@Override
public int hashCode() {
- return Objects.hashCode(bundles);
+ return Objects.hashCode(bundles, bookkeeperAffinityGroup);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof LocalPolicies) {
LocalPolicies other = (LocalPolicies) obj;
- return Objects.equal(bundles, other.bundles);
+ return Objects.equal(bundles, other.bundles)
+ && Objects.equal(bookkeeperAffinityGroup, other.bookkeeperAffinityGroup);
}
return false;
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 010d45a..ca0d95d 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.zookeeper;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.UnknownHostException;
@@ -38,6 +39,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;