You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/16 18:03:13 UTC

[pulsar] branch master updated: [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a34b871620 [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)
1a34b871620 is described below

commit 1a34b8716205ef6f03839b4843de60d8f6e9a509
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Fri Sep 16 20:03:00 2022 +0200

    [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions (#17690)
    
    * [fix][functions] Ensure InternalConfigurationData data model is compatible across different versions
    
    * style
    
    * fix the other way
    
    ### Motivation
    
    After https://github.com/apache/pulsar/pull/14384, the broker and the client expects that the `InternalConfigurationData` contains `metadataStoreUrl` and `configurationMetadataStoreUrl` fields.
    However the broker is no more compatible with old clients.
    
    
    https://github.com/apache/pulsar/pull/14384 is landed to branch-2.11 and [2.10.2](https://github.com/apache/pulsar/pull/17291)
    
    Example scenario:
    - broker on 2.10.1
    - function worker on 2.10.1
    
    1. upgrade fn worker to 2.11.0 or 2.10.2
    2. the fn worker starts and download the internal config from the broker
    3. broker serves a json with old fields (`zookeeperServers` and `configurationStoreServers`)
    4. fn worker reads the json and convert it to a `InternalConfigurationData` instance. It expects to see the fields filled `metadataStoreUrl` and `configurationMetadataStoreUrl` but they aren't
    5. NPE on fn worker
    ```
    2022-09-15T17:42:16,072+0000 [main] INFO  org.apache.pulsar.functions.worker.PulsarWorkerService - Initializing Pulsar Functions namespace...
    2022-09-15T17:42:16,192+0000 [main] ERROR org.apache.pulsar.functions.worker.FunctionWorkerStarter - Encountered error in function worker.
    java.lang.NullPointerException: null
        at org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL(MetadataStoreFactoryImpl.java:73)
        at org.apache.pulsar.functions.worker.WorkerUtils.initializeDlogNamespace(WorkerUtils.java:188)
        at org.apache.pulsar.functions.worker.PulsarWorkerService.initializeStandaloneWorkerService(PulsarWorkerService.java:281)
        at org.apache.pulsar.functions.worker.PulsarWorkerService.initAsStandalone(PulsarWorkerService.java:208)
        at org.apache.pulsar.functions.worker.Worker.start(Worker.java:54)
        at org.apache.pulsar.functions.worker.FunctionWorkerStarter.main(FunctionWorkerStarter.java:76)
    ```
    
    Additionaly there's the same issue if we upgrade the broker before the fn worker:
    1. the broker gets the upgrade. won't serve `zookeeperServers` field
    2. fn worker restarts for some reasons.
    3. fn worker gets the internal config and look for  `zookeeperServers` field which is empty in the json
    4. NPE
    
    ### Modifications
    
    * Restore old fields in `InternalConfigurationData` and add fallback the old values in the new fields getters
    * Added unit test
    
    - [x] `doc-not-needed`
---
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 47 ++++++++++++++++++++++
 .../common/conf/InternalConfigurationData.java     | 38 ++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 9bbcf18b9e9..d035d7c4290 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -51,6 +51,8 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -87,9 +89,11 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.AllocatorStats;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
@@ -205,6 +209,49 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(response, expectedData);
     }
 
+    @Data
+    @AllArgsConstructor
+    /**
+     * Internal configuration data model before  (before https://github.com/apache/pulsar/pull/14384).
+     */
+    private static class OldInternalConfigurationData {
+        private String zookeeperServers;
+        private String configurationStoreServers;
+        @Deprecated
+        private String ledgersRootPath;
+        private String bookkeeperMetadataServiceUri;
+        private String stateStorageServiceUrl;
+    }
+
+    /**
+     * This test verifies that the model data changes in InternalConfigurationData are retro-compatible.
+     * InternalConfigurationData is downloaded from the Function worker from a broker.
+     * The broker may be still serve an "old" version of InternalConfigurationData
+     * (before https://github.com/apache/pulsar/pull/14384) while the Worker already uses the new one.
+     * @throws Exception
+     */
+    @Test
+    public void internalConfigurationRetroCompatibility() throws Exception {
+        OldInternalConfigurationData oldDataModel = new OldInternalConfigurationData(
+                MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL(conf.getMetadataStoreUrl()),
+                conf.getConfigurationMetadataStoreUrl(),
+                new ClientConfiguration().getZkLedgersRootPath(),
+                conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
+                pulsar.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
+
+        final Map<String, Object> oldDataJson = ObjectMapperFactory
+                .getThreadLocal().convertValue(oldDataModel, Map.class);
+
+        final InternalConfigurationData newData = ObjectMapperFactory.getThreadLocal()
+                .convertValue(oldDataJson, InternalConfigurationData.class);
+
+        assertEquals(newData.getMetadataStoreUrl(), conf.getMetadataStoreUrl());
+        assertEquals(newData.getConfigurationMetadataStoreUrl(), oldDataModel.getConfigurationStoreServers());
+        assertEquals(newData.getLedgersRootPath(), oldDataModel.getLedgersRootPath());
+        assertEquals(newData.getBookkeeperMetadataServiceUri(), oldDataModel.getBookkeeperMetadataServiceUri());
+        assertEquals(newData.getStateStorageServiceUrl(), oldDataModel.getStateStorageServiceUrl());
+    }
+
     @Test
     @SuppressWarnings("unchecked")
     public void clusters() throws Exception {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index 2927aad8afe..1099debfc17 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -27,6 +27,10 @@ import lombok.ToString;
 @ToString
 public class InternalConfigurationData {
 
+    @Deprecated
+    private String zookeeperServers;
+    @Deprecated
+    private String configurationStoreServers;
     private String metadataStoreUrl;
     private String configurationMetadataStoreUrl;
     @Deprecated
@@ -43,18 +47,48 @@ public class InternalConfigurationData {
                                      String bookkeeperMetadataServiceUri,
                                      String stateStorageServiceUrl) {
         this.metadataStoreUrl = zookeeperServers;
+        this.zookeeperServers = zookeeperServers;
         this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
+        this.configurationStoreServers = configurationMetadataStoreUrl;
         this.ledgersRootPath = ledgersRootPath;
         this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
     }
 
+    @Deprecated
+    public String getZookeeperServers() {
+        return zookeeperServers;
+    }
+
+    @Deprecated
+    public void setZookeeperServers(String zookeeperServers) {
+        this.zookeeperServers = zookeeperServers;
+    }
+
+    @Deprecated
+    public String getConfigurationStoreServers() {
+        return configurationStoreServers;
+    }
+
+    @Deprecated
+    public void setConfigurationStoreServers(String configurationStoreServers) {
+        this.configurationStoreServers = configurationStoreServers;
+    }
+
     public String getMetadataStoreUrl() {
-        return metadataStoreUrl;
+        if (metadataStoreUrl != null) {
+            return metadataStoreUrl;
+        } else if (zookeeperServers != null) {
+            return "zk:" + zookeeperServers;
+        }
+        return null;
     }
 
     public String getConfigurationMetadataStoreUrl() {
-        return configurationMetadataStoreUrl;
+        if (configurationMetadataStoreUrl != null) {
+            return configurationMetadataStoreUrl;
+        }
+        return configurationStoreServers;
     }
 
     /** @deprecated */