You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/16 18:03:13 UTC
[pulsar] branch master updated: [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a34b871620 [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)
1a34b871620 is described below
commit 1a34b8716205ef6f03839b4843de60d8f6e9a509
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Sep 16 20:03:00 2022 +0200
[fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)
* [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions
* style
* fix the other way
### Motivation
After https://github.com/apache/pulsar/pull/14384, the broker and the client expects that the `InternalConfigurationData` contains `metadataStoreUrl` and `configurationMetadataStoreUrl` fields.
However the broker is no more compatible with old clients.
https://github.com/apache/pulsar/pull/14384 is landed to branch-2.11 and [2.10.2](https://github.com/apache/pulsar/pull/17291)
Example scenario:
- broker on 2.10.1
- function worker on 2.10.1
1. upgrade fn worker to 2.11.0 or 2.10.2
2. the fn worker starts and download the internal config from the broker
3. broker serves a json with old fields (`zookeeperServers` and `configurationStoreServers`)
4. fn worker reads the json and convert it to a `InternalConfigurationData` instance. It expects to see the fields filled `metadataStoreUrl` and `configurationMetadataStoreUrl` but they aren't
5. NPE on fn worker
```
2022-09-15T17:42:16,072+0000 [main] INFO org.apache.pulsar.functions.worker.PulsarWorkerService - Initializing Pulsar Functions namespace...
2022-09-15T17:42:16,192+0000 [main] ERROR org.apache.pulsar.functions.worker.FunctionWorkerStarter - Encountered error in function worker.
java.lang.NullPointerException: null
at org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL(MetadataStoreFactoryImpl.java:73)
at org.apache.pulsar.functions.worker.WorkerUtils.initializeDlogNamespace(WorkerUtils.java:188)
at org.apache.pulsar.functions.worker.PulsarWorkerService.initializeStandaloneWorkerService(PulsarWorkerService.java:281)
at org.apache.pulsar.functions.worker.PulsarWorkerService.initAsStandalone(PulsarWorkerService.java:208)
at org.apache.pulsar.functions.worker.Worker.start(Worker.java:54)
at org.apache.pulsar.functions.worker.FunctionWorkerStarter.main(FunctionWorkerStarter.java:76)
```
Additionaly there's the same issue if we upgrade the broker before the fn worker:
1. the broker gets the upgrade. won't serve `zookeeperServers` field
2. fn worker restarts for some reasons.
3. fn worker gets the internal config and look for `zookeeperServers` field which is empty in the json
4. NPE
### Modifications
* Restore old fields in `InternalConfigurationData` and add fallback the old values in the new fields getters
* Added unit test
- [x] `doc-not-needed`
---
.../org/apache/pulsar/broker/admin/AdminTest.java | 47 ++++++++++++++++++++++
.../common/conf/InternalConfigurationData.java | 38 ++++++++++++++++-
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 9bbcf18b9e9..d035d7c4290 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -51,6 +51,8 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -87,9 +89,11 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
@@ -205,6 +209,49 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(response, expectedData);
}
+ @Data
+ @AllArgsConstructor
+ /**
+ * Internal configuration data model before (before https://github.com/apache/pulsar/pull/14384).
+ */
+ private static class OldInternalConfigurationData {
+ private String zookeeperServers;
+ private String configurationStoreServers;
+ @Deprecated
+ private String ledgersRootPath;
+ private String bookkeeperMetadataServiceUri;
+ private String stateStorageServiceUrl;
+ }
+
+ /**
+ * This test verifies that the model data changes in InternalConfigurationData are retro-compatible.
+ * InternalConfigurationData is downloaded from the Function worker from a broker.
+ * The broker may be still serve an "old" version of InternalConfigurationData
+ * (before https://github.com/apache/pulsar/pull/14384) while the Worker already uses the new one.
+ * @throws Exception
+ */
+ @Test
+ public void internalConfigurationRetroCompatibility() throws Exception {
+ OldInternalConfigurationData oldDataModel = new OldInternalConfigurationData(
+ MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL(conf.getMetadataStoreUrl()),
+ conf.getConfigurationMetadataStoreUrl(),
+ new ClientConfiguration().getZkLedgersRootPath(),
+ conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
+ pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
+
+ final Map<String, Object> oldDataJson = ObjectMapperFactory
+ .getThreadLocal().convertValue(oldDataModel, Map.class);
+
+ final InternalConfigurationData newData = ObjectMapperFactory.getThreadLocal()
+ .convertValue(oldDataJson, InternalConfigurationData.class);
+
+ assertEquals(newData.getMetadataStoreUrl(), conf.getMetadataStoreUrl());
+ assertEquals(newData.getConfigurationMetadataStoreUrl(), oldDataModel.getConfigurationStoreServers());
+ assertEquals(newData.getLedgersRootPath(), oldDataModel.getLedgersRootPath());
+ assertEquals(newData.getBookkeeperMetadataServiceUri(), oldDataModel.getBookkeeperMetadataServiceUri());
+ assertEquals(newData.getStateStorageServiceUrl(), oldDataModel.getStateStorageServiceUrl());
+ }
+
@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 2927aad8afe..1099debfc17 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -27,6 +27,10 @@ import lombok.ToString;
@ToString
public class InternalConfigurationData {
+ @Deprecated
+ private String zookeeperServers;
+ @Deprecated
+ private String configurationStoreServers;
private String metadataStoreUrl;
private String configurationMetadataStoreUrl;
@Deprecated
@@ -43,18 +47,48 @@ public class InternalConfigurationData {
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.metadataStoreUrl = zookeeperServers;
+ this.zookeeperServers = zookeeperServers;
this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
+ this.configurationStoreServers = configurationMetadataStoreUrl;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}
+ @Deprecated
+ public String getZookeeperServers() {
+ return zookeeperServers;
+ }
+
+ @Deprecated
+ public void setZookeeperServers(String zookeeperServers) {
+ this.zookeeperServers = zookeeperServers;
+ }
+
+ @Deprecated
+ public String getConfigurationStoreServers() {
+ return configurationStoreServers;
+ }
+
+ @Deprecated
+ public void setConfigurationStoreServers(String configurationStoreServers) {
+ this.configurationStoreServers = configurationStoreServers;
+ }
+
public String getMetadataStoreUrl() {
- return metadataStoreUrl;
+ if (metadataStoreUrl != null) {
+ return metadataStoreUrl;
+ } else if (zookeeperServers != null) {
+ return "zk:" + zookeeperServers;
+ }
+ return null;
}
public String getConfigurationMetadataStoreUrl() {
- return configurationMetadataStoreUrl;
+ if (configurationMetadataStoreUrl != null) {
+ return configurationMetadataStoreUrl;
+ }
+ return configurationStoreServers;
}
/** @deprecated */