You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/21 15:09:19 UTC
[pulsar] branch branch-2.8 updated (dd8aeaf -> 5d52994)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.
from dd8aeaf Remove python3 from vcpkg.json (#12092)
new ae02ead Fixed NPE in ProxyConnection with no auth data (#12111)
new 5d52994 Bugfix: Fix rackaware placement policy init error (#12097)
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../pulsar/proxy/server/ProxyConnection.java | 4 ++-
.../zookeeper/ZkBookieRackAffinityMapping.java | 39 +++++++++++-----------
2 files changed, 23 insertions(+), 20 deletions(-)
[pulsar] 01/02: Fixed NPE in ProxyConnection with no auth data
(#12111)
Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ae02ead671984690de82a0a299ba69380c7c124c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Sep 20 22:04:44 2021 -0700
Fixed NPE in ProxyConnection with no auth data (#12111)
### Motivation
In #12057 there was a fix for missing authdata, but `AuthData.of()` is expecting a valid `byte[]` instance, empty if there are no credentials.
---
.../src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 28c6083..dd814cf 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -92,6 +92,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private String proxyToBrokerUrl;
private HAProxyMessage haProxyMessage;
+ private static final byte[] EMPTY_CREDENTIALS = new byte[0];
+
enum State {
Init,
@@ -315,7 +317,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
return;
}
- AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : null);
+ AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS);
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
[pulsar] 02/02: Bugfix: Fix rackaware placement policy init error
(#12097)
Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5d52994c6e6c2d8afe339597c70c0d1197b38eb2
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Mon Sep 20 09:22:21 2021 -0600
Bugfix: Fix rackaware placement policy init error (#12097)
Since the release of Pulsar 2.8 and upgrade to BK 4.12, the default
rackAwarePlacementPolicy has been failing with the following exception:
```
org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Failed to initialize DNS Resolver org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping, used default subnet resolver : java.lang.RuntimeException: java.lang.NullPointerException java.lang.NullPointerException
```
This regression occured in commit https://github.com/apache/pulsar/commit/4c6026213b743a7f23ae2a5a6d37ee7404b066db
The core of the issue is that `setConf` is called before
`setBookieAddressResolver` has been set (see
https://github.com/apache/bookkeeper/blob/034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java#L264-L286)
This results in the NPE.
We are safe to simply not eagerly init the cache in setConf as the
getRack call will re-check the cache.
We also protect against this possible NPE for safety
---
.../zookeeper/ZkBookieRackAffinityMapping.java | 39 +++++++++++-----------
1 file changed, 20 insertions(+), 19 deletions(-)
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 19dcb99..0bdffca 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -76,12 +76,8 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
conf.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, bookieMappingCache);
}
- try {
- BookiesRackConfiguration racks = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).orElseGet(BookiesRackConfiguration::new);
- updateRacksWithHost(racks);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ // A previous version of this code tried to eagerly load the cache. However, this is invalid
+ // in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set
}
private void updateRacksWithHost(BookiesRackConfiguration racks) {
@@ -94,20 +90,25 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
bookies.forEach((addr, bi) -> {
try {
BookieId bookieId = BookieId.parse(addr);
- BookieSocketAddress bsa = getBookieAddressResolver().resolve(bookieId);
- newRacksWithHost.updateBookie(group, bsa.toString(), bi);
-
- String hostname = bsa.getSocketAddress().getHostName();
- newBookieInfoMap.put(hostname, bi);
-
- InetAddress address = bsa.getSocketAddress().getAddress();
- if (null != address) {
- String hostIp = address.getHostAddress();
- if (null != hostIp) {
- newBookieInfoMap.put(hostIp, bi);
- }
+ BookieAddressResolver addressResolver = getBookieAddressResolver();
+ if (addressResolver == null) {
+ LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
} else {
- LOG.info("Network address for {} is unresolvable yet.", addr);
+ BookieSocketAddress bsa = addressResolver.resolve(bookieId);
+ newRacksWithHost.updateBookie(group, bsa.toString(), bi);
+
+ String hostname = bsa.getSocketAddress().getHostName();
+ newBookieInfoMap.put(hostname, bi);
+
+ InetAddress address = bsa.getSocketAddress().getAddress();
+ if (null != address) {
+ String hostIp = address.getHostAddress();
+ if (null != hostIp) {
+ newBookieInfoMap.put(hostIp, bi);
+ }
+ } else {
+ LOG.info("Network address for {} is unresolvable yet.", addr);
+ }
}
} catch (BookieAddressResolver.BookieIdNotResolvedException e) {
LOG.info("Network address for {} is unresolvable yet. error is {}", addr, e);