You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/28 20:25:31 UTC

[pulsar] branch master updated: [pulsar-broker] Fix: Deserialization failing for ZkIsolatedBookieEnsemblePlacementPolicy (#3918)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 012b818  [pulsar-broker] Fix: Deserialization failing for ZkIsolatedBookieEnsemblePlacementPolicy (#3918)
012b818 is described below

commit 012b818bf4d6c0ea7d3b5cc4ea79d9ed5e14ac48
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Mar 28 13:25:26 2019 -0700

    [pulsar-broker] Fix: Deserialization failing for ZkIsolatedBookieEnsemblePlacementPolicy (#3918)
---
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java        | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index d55366a..fb8d4a9 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -36,7 +36,7 @@ import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
-import org.apache.pulsar.common.policies.data.BookieInfo;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.KeeperException;
@@ -44,13 +44,12 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import io.netty.util.HashedWheelTimer;
 
 public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy
-        implements Deserializer<Map<String, Map<BookieSocketAddress, BookieInfo>>> {
+        implements Deserializer<BookiesRackConfiguration> {
     private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
 
     public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups";
@@ -59,8 +58,6 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
 
     private final List<String> isolationGroups = new ArrayList<String>();
     private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
-    private final TypeReference<Map<String, Map<BookieSocketAddress, BookieInfo>>> typeRef = new TypeReference<Map<String, Map<BookieSocketAddress, BookieInfo>>>() {
-    };
 
     public ZkIsolatedBookieEnsemblePlacementPolicy() {
         super();
@@ -139,14 +136,14 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
         Set<BookieSocketAddress> blacklistedBookies = new HashSet<BookieSocketAddress>();
         try {
             if (bookieMappingCache != null) {
-                Map<String, Map<BookieSocketAddress, BookieInfo>> allGroupsBookieMapping = bookieMappingCache
+                BookiesRackConfiguration allGroupsBookieMapping = bookieMappingCache
                         .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
                         .orElseThrow(() -> new KeeperException.NoNodeException(
                                 ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
                 for (String group : allGroupsBookieMapping.keySet()) {
                     if (!isolationGroups.contains(group)) {
-                        for (BookieSocketAddress bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
-                            blacklistedBookies.add(bookieAddress);
+                        for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
+                            blacklistedBookies.add(new BookieSocketAddress(bookieAddress));
                         }
                     }
                 }
@@ -158,11 +155,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
     }
 
     @Override
-    public Map<String, Map<BookieSocketAddress, BookieInfo>> deserialize(String key, byte[] content) throws Exception {
+    public BookiesRackConfiguration deserialize(String key, byte[] content) throws Exception {
         LOG.info("Reloading the bookie isolation groups mapping cache.");
         if (LOG.isDebugEnabled()) {
             LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(content));
         }
-        return jsonMapper.readValue(content, typeRef);
+        return jsonMapper.readValue(content, BookiesRackConfiguration.class);
     }
 }