You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/14 08:33:27 UTC

[pulsar] branch master updated: Allow Pulsar to use BookieID instead of Bookie Network Addresses (Update BK to 4.12.1) (#9019)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c60262  Allow Pulsar to use BookieID instead of Bookie Network Addresses (Update BK to 4.12.1) (#9019)
4c60262 is described below

commit 4c6026213b743a7f23ae2a5a6d37ee7404b066db
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Jan 14 09:32:57 2021 +0100

    Allow Pulsar to use BookieID instead of Bookie Network Addresses (Update BK to 4.12.1) (#9019)
    
    This change allows ZkBookieRackAffinityMapping to deal with BookieId instead of raw BookieSocketAddresses.
    
    Summary of changes:
    - upgrade to BK 4.12.1
    - use BookieAddressResolver in ZkBookieRackAffinityMapping
    - Start BookieServer passing "null" as BookieServiceInfo provider instead of BookieServiceInfo.NO_INFO (this change allows the Bookie bundled with Pulsar to publish local endpoints)
    - indirectly the update of BK to 4.12.1 brings the update of Apache Curator from 4.x to 5.1. It is used by BK StreamStorage
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 58 +++++++++++-----------
 pom.xml                                            |  2 +-
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  3 +-
 .../pulsar/broker/service/RackAwareTest.java       |  2 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  5 +-
 pulsar-sql/presto-distribution/LICENSE             | 24 ++++-----
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |  9 ++--
 .../zookeeper/ZkBookieRackAffinityMapping.java     |  8 +--
 .../zookeeper/ZkBookieRackAffinityMappingTest.java |  4 ++
 9 files changed, 60 insertions(+), 55 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 344971b..b5bd7de 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -395,32 +395,32 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-1.2-api-2.14.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.12.0.jar
-    - org.apache.bookkeeper-bookkeeper-common-allocator-4.12.0.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.12.0.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.12.0.jar
-    - org.apache.bookkeeper-bookkeeper-tools-framework-4.12.0.jar
-    - org.apache.bookkeeper-circe-checksum-4.12.0.jar
-    - org.apache.bookkeeper-cpu-affinity-4.12.0.jar
-    - org.apache.bookkeeper-statelib-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-api-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-common-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-server-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.12.0.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.12.0.jar
-    - org.apache.bookkeeper.http-http-server-4.12.0.jar
-    - org.apache.bookkeeper.http-vertx-http-server-4.12.0.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.0.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.0.jar
-    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.0.jar
-    - org.apache.distributedlog-distributedlog-common-4.12.0.jar
-    - org.apache.distributedlog-distributedlog-core-4.12.0-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.12.0.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.12.0.jar
-    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.0.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.12.1.jar
+    - org.apache.bookkeeper-bookkeeper-common-allocator-4.12.1.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.12.1.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.12.1.jar
+    - org.apache.bookkeeper-bookkeeper-tools-framework-4.12.1.jar
+    - org.apache.bookkeeper-circe-checksum-4.12.1.jar
+    - org.apache.bookkeeper-cpu-affinity-4.12.1.jar
+    - org.apache.bookkeeper-statelib-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-api-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-common-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-server-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.12.1.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.12.1.jar
+    - org.apache.bookkeeper.http-http-server-4.12.1.jar
+    - org.apache.bookkeeper.http-vertx-http-server-4.12.1.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.12.1.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.12.1.jar
+    - org.apache.bookkeeper.tests-stream-storage-tests-common-4.12.1.jar
+    - org.apache.distributedlog-distributedlog-common-4.12.1.jar
+    - org.apache.distributedlog-distributedlog-core-4.12.1-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.12.1.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.12.1.jar
+    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.12.1.jar
   * Apache HTTP Client
     - org.apache.httpcomponents-httpclient-4.5.5.jar
     - org.apache.httpcomponents-httpcore-4.4.9.jar
@@ -476,9 +476,9 @@ The Apache Software License, Version 2.0
     - org.apache.avro-avro-1.9.1.jar
     - org.apache.avro-avro-protobuf-1.9.1.jar
   * Apache Curator
-    - org.apache.curator-curator-client-4.0.1.jar
-    - org.apache.curator-curator-framework-4.0.1.jar
-    - org.apache.curator-curator-recipes-4.0.1.jar
+    - org.apache.curator-curator-client-5.1.0.jar
+    - org.apache.curator-curator-framework-5.1.0.jar
+    - org.apache.curator-curator-recipes-5.1.0.jar
   * Apache Yetus
     - org.apache.yetus-audience-annotations-0.5.0.jar
   * @FreeBuilder
diff --git a/pom.xml b/pom.xml
index 0703117..416ebe0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@ flexible messaging model and an intuitive client API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.19</commons-compress.version>
 
-    <bookkeeper.version>4.12.0</bookkeeper.version>
+    <bookkeeper.version>4.12.1</bookkeeper.version>
     <zookeeper.version>3.5.7</zookeeper.version>
     <netty.version>4.1.51.Final</netty.version>
     <netty-tc-native.version>2.0.33.Final</netty-tc-native.version>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index bfe14d2..dfd7d2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -38,7 +38,6 @@ import java.util.Date;
 import java.util.Optional;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.stats.StatsProvider;
@@ -225,7 +224,7 @@ public class PulsarBrokerStarter {
                 checkNotNull(bookieConfig, "No ServerConfiguration for Bookie");
                 checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie");
                 bookieServer = new BookieServer(
-                        bookieConfig, bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO);
+                        bookieConfig, bookieStatsProvider.getStatsLogger(""), null);
             } else {
                 bookieServer = null;
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 8d9ea4e..4845d50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -71,7 +71,7 @@ public class RackAwareTest extends BkEnsemblesTestBase {
             String addr = String.format("10.0.0.%d", i + 1);
             conf.setAdvertisedAddress(addr);
 
-            BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
+            BookieServer bs = new BookieServer(conf, NullStatsLogger.INSTANCE, null);
 
             bs.start();
             bookies.add(bs);
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 042e7fa..28633e9 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -59,6 +59,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
 
 /**
  * A class runs several bookie servers for testing.
@@ -126,7 +127,7 @@ public abstract class BookKeeperClusterTestCase {
      * @throws Exception
      */
     protected void startZKCluster() throws Exception {
-        zkUtil.startServer();
+        zkUtil.startCluster();
         zkc = zkUtil.getZooKeeperClient();
     }
 
@@ -136,7 +137,7 @@ public abstract class BookKeeperClusterTestCase {
      * @throws Exception
      */
     protected void stopZKCluster() throws Exception {
-        zkUtil.killServer();
+        zkUtil.stopCluster();
     }
 
     /**
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index cd03996..4126581 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -411,18 +411,18 @@ The Apache Software License, Version 2.0
     - async-http-client-2.12.1.jar
     - async-http-client-netty-utils-2.12.1.jar
   * Apache Bookkeeper
-    - bookkeeper-common-4.12.0.jar
-    - bookkeeper-common-allocator-4.12.0.jar
-    - bookkeeper-proto-4.12.0.jar
-    - bookkeeper-server-4.12.0.jar
-    - bookkeeper-stats-api-4.12.0.jar
-    - bookkeeper-tools-framework-4.12.0.jar
-    - circe-checksum-4.12.0.jar
-    - codahale-metrics-provider-4.12.0jar
-    - cpu-affinity-4.12.0.jar
-    - http-server-4.12.0.jar
-    - prometheus-metrics-provider-4.12.0.jar
-    - codahale-metrics-provider-4.12.0.jar
+    - bookkeeper-common-4.12.1.jar
+    - bookkeeper-common-allocator-4.12.1.jar
+    - bookkeeper-proto-4.12.1.jar
+    - bookkeeper-server-4.12.1.jar
+    - bookkeeper-stats-api-4.12.1.jar
+    - bookkeeper-tools-framework-4.12.1.jar
+    - circe-checksum-4.12.1.jar
+    - codahale-metrics-provider-4.12.1jar
+    - cpu-affinity-4.12.1.jar
+    - http-server-4.12.1.jar
+    - prometheus-metrics-provider-4.12.1.jar
+    - codahale-metrics-provider-4.12.1.jar
   * Apache Commons
     - commons-cli-1.2.jar
     - commons-codec-1.10.jar
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 0672c26..6a1b635 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -55,7 +55,6 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -299,7 +298,7 @@ public class LocalBookkeeperEnsemble {
             bsConfs[i].setAllowEphemeralPorts(true);
 
             try {
-                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
+                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
             } catch (InvalidCookieException e) {
                 // InvalidCookieException can happen if the machine IP has changed
                 // Since we are running here a local bookie that is always accessed
@@ -312,7 +311,7 @@ public class LocalBookkeeperEnsemble {
                 new File(new File(bkDataDir, "current"), "VERSION").delete();
 
                 // Retry to start the bookie after cleaning the old left cookie
-                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
+                bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
             }
             bs[i].start();
             LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
@@ -446,7 +445,7 @@ public class LocalBookkeeperEnsemble {
 
     public void startBK(int i) throws Exception {
         try {
-            bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
+            bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
         } catch (InvalidCookieException e) {
             // InvalidCookieException can happen if the machine IP has changed
             // Since we are running here a local bookie that is always accessed
@@ -459,7 +458,7 @@ public class LocalBookkeeperEnsemble {
             new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
 
             // Retry to start the bookie after cleaning the old left cookie
-            bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, BookieServiceInfo.NO_INFO);
+            bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
 
         }
         bs[i].start();
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 1062942..b618771 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieNode;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.common.policies.data.BookieInfo;
@@ -92,7 +93,8 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
         racks.forEach((group, bookies) ->
                 bookies.forEach((addr, bi) -> {
                     try {
-                        BookieSocketAddress bsa = new BookieSocketAddress(addr);
+                        BookieId bookieId = BookieId.parse(addr);
+                        BookieSocketAddress bsa = getBookieAddressResolver().resolve(bookieId);
                         newRacksWithHost.updateBookie(group, bsa.toString(), bi);
 
                         String hostname = bsa.getSocketAddress().getHostName();
@@ -107,8 +109,8 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                         } else {
                             LOG.info("Network address for {} is unresolvable yet.", addr);
                         }
-                    } catch (UnknownHostException e) {
-                        throw new RuntimeException(e);
+                    } catch (BookieAddressResolver.BookieIdNotResolvedException e) {
+                        LOG.info("Network address for {} is unresolvable yet. error is {}", addr, e);
                     }
                 })
         );
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
index 0418e10..3e73d5a 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
@@ -81,6 +81,7 @@ public class ZkBookieRackAffinityMappingTest {
         bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
         });
         assertNull(bkClientConf1.getProperty(ZkBookieRackAffinityMapping.ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE));
+        mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping1.setConf(bkClientConf1);
         List<String> racks1 = mapping1
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
@@ -95,6 +96,7 @@ public class ZkBookieRackAffinityMappingTest {
         ClientConfiguration bkClientConf2 = new ClientConfiguration();
         bkClientConf2.setZkServers("127.0.0.1" + ":" + localZkS.getZookeeperPort());
         bkClientConf2.setZkTimeout(1000);
+        mapping2.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping2.setConf(bkClientConf2);
         List<String> racks2 = mapping2
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
@@ -111,6 +113,7 @@ public class ZkBookieRackAffinityMappingTest {
         ClientConfiguration bkClientConf = new ClientConfiguration();
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
         });
+        mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping.setConf(bkClientConf);
         List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
         assertEquals(racks.get(0), null);
@@ -155,6 +158,7 @@ public class ZkBookieRackAffinityMappingTest {
         ClientConfiguration bkClientConf = new ClientConfiguration();
         bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
         });
+        mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping.setConf(bkClientConf);
         List<String> racks = mapping
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));