You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/12/15 11:19:47 UTC

[pulsar] branch branch-2.9 updated: Fix incompatibility of BacklogQuota (#13291)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4e4c787  Fix incompatibility of BacklogQuota (#13291)
4e4c787 is described below

commit 4e4c7877d1ef0add6727cd1b58063222d152ed71
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Wed Dec 15 19:16:15 2021 +0800

    Fix incompatibility of BacklogQuota (#13291)
    
    (cherry picked from commit 4e89c2bd93bab682d288cf8d069fd77a2c9cb1e4)
---
 .../pulsar/common/policies/data/BacklogQuota.java  |  9 +++
 .../policies/data/impl/BacklogQuotaImpl.java       | 77 ++++++++++++++++++++--
 .../common/policies/data/BacklogQuotaMixIn.java    | 26 --------
 .../pulsar/common/util/ObjectMapperFactory.java    |  2 -
 .../common/util/ObjectMapperFactoryTest.java       | 27 --------
 .../metadata/BacklogQuotaCompatibilityTest.java    | 60 +++++++++++++++--
 6 files changed, 135 insertions(+), 66 deletions(-)

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
index d4b5c4b..4604710 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
@@ -30,6 +30,15 @@ public interface BacklogQuota {
 
     /**
      * Gets quota limit in size.
+     * Remains for compatible
+     *
+     * @return quota limit in bytes
+     */
+    @Deprecated
+    long getLimit();
+
+    /**
+     * Gets quota limit in size.
      *
      * @return quota limit in bytes
      */
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
index 591e8b8..3d97fa0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
@@ -18,23 +18,86 @@
  */
 package org.apache.pulsar.common.policies.data.impl;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import lombok.ToString;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 
-@Data
-@AllArgsConstructor
+@ToString
+@EqualsAndHashCode
 @NoArgsConstructor
 public class BacklogQuotaImpl implements BacklogQuota {
     public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024;
 
-    // backlog quota by size in byte
-    private long limitSize;
-    // backlog quota by time in second
+    /**
+     * backlog quota by size in byte, remains for compatible.
+     * for the details: https://github.com/apache/pulsar/pull/13291
+     * @since 2.9.1
+     */
+    @Deprecated
+    private Long limit;
+
+    /**
+     * backlog quota by size in byte.
+     */
+    private Long limitSize;
+
+    /**
+     * backlog quota by time in second.
+     */
     private int limitTime;
     private RetentionPolicy policy;
 
+    public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) {
+        this.limitSize = limitSize;
+        this.limitTime = limitTime;
+        this.policy = policy;
+    }
+
+    @Deprecated
+    public long getLimit() {
+        if (limitSize == null) {
+            // the limitSize and limit can't be both null
+            return limit;
+        }
+        return limitSize;
+    }
+
+    @Deprecated
+    public void setLimit(long limit) {
+        this.limit = limit;
+        this.limitSize = limit;
+    }
+
+    public long getLimitSize() {
+        if (limitSize == null) {
+            // the limitSize and limit can't be both null
+            return limit;
+        }
+        return limitSize;
+    }
+
+    public void setLimitSize(long limitSize) {
+        this.limitSize = limitSize;
+        this.limit = limitSize;
+    }
+
+    public int getLimitTime() {
+        return limitTime;
+    }
+
+    public void setLimitTime(int limitTime) {
+        this.limitTime = limitTime;
+    }
+
+    public RetentionPolicy getPolicy() {
+        return policy;
+    }
+
+    public void setPolicy(RetentionPolicy policy) {
+        this.policy = policy;
+    }
+
     public static BacklogQuotaImplBuilder builder() {
         return new BacklogQuotaImplBuilder();
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
deleted file mode 100644
index a156240..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.policies.data;
-
-import com.fasterxml.jackson.annotation.JsonAlias;
-
-public abstract class BacklogQuotaMixIn {
-    @JsonAlias("limit")
-    private long limitSize;
-}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 94e1b7a..ef2e489 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BacklogQuotaMixIn;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
@@ -192,7 +191,6 @@ public class ObjectMapperFactory {
         resolver.addMapping(AutoSubscriptionCreationOverride.class, AutoSubscriptionCreationOverrideImpl.class);
 
         // we use MixIn class to add jackson annotations
-        mapper.addMixIn(BacklogQuotaImpl.class, BacklogQuotaMixIn.class);
         mapper.addMixIn(ResourceQuota.class, ResourceQuotaMixIn.class);
         mapper.addMixIn(FunctionConfig.class, JsonIgnorePropertiesMixIn.class);
         mapper.addMixIn(FunctionState.class, JsonIgnorePropertiesMixIn.class);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
index dc8aa8b..f9e3729 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.common.util;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.ToString;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.stats.Metrics;
@@ -28,32 +27,6 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class ObjectMapperFactoryTest {
-    @Test
-    public void testBacklogQuotaMixIn() {
-        ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
-        String json = "{\"limit\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
-        try {
-            BacklogQuota backlogQuota = objectMapper.readValue(json, BacklogQuota.class);
-            Assert.assertEquals(backlogQuota.getLimitSize(), 10);
-            Assert.assertEquals(backlogQuota.getLimitTime(), 0);
-            Assert.assertEquals(backlogQuota.getPolicy(), BacklogQuota.RetentionPolicy.producer_request_hold);
-        } catch (Exception ex) {
-            Assert.fail("shouldn't have thrown exception", ex);
-        }
-
-        try {
-            String expectJson = "{\"limitSize\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
-            BacklogQuota backlogQuota = BacklogQuota.builder()
-                    .limitSize(10)
-                    .limitTime(0)
-                    .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
-                    .build();
-            String writeJson = objectMapper.writeValueAsString(backlogQuota);
-            Assert.assertEquals(expectJson, writeJson);
-        } catch (Exception ex) {
-            Assert.fail("shouldn't have thrown exception", ex);
-        }
-    }
 
     @Test
     public void testResourceQuotaMixIn() {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
index 06765e3..659825c 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
@@ -19,15 +19,70 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import java.io.IOException;
+import java.util.HashMap;
+
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class BacklogQuotaCompatibilityTest {
 
+    private final JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(Policies.class, null);
+
+    private final JSONMetadataSerdeSimpleType<Policies> simpleType = new JSONMetadataSerdeSimpleType<>(typeRef);
+
+    private final BacklogQuota.RetentionPolicy testPolicy = BacklogQuota.RetentionPolicy.consumer_backlog_eviction;
+
+    @Test
+    public void testV27ClientSetV28BrokerRead() throws Exception {
+        Policies writePolicy = new Policies();
+        BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+        writeBacklogQuota.setLimit(1024);
+        writeBacklogQuota.setLimitTime(60);
+        writeBacklogQuota.setPolicy(testPolicy);
+        HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
+        quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
+        writePolicy.backlog_quota_map = quotaHashMap;
+        byte[] serialize = simpleType.serialize("/path", writePolicy);
+        Policies policies = simpleType.deserialize("/path", serialize, null);
+        BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024);
+        Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+        Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+    }
+
+    @Test
+    public void testV28ClientSetV28BrokerRead() throws Exception {
+        Policies writePolicy = new Policies();
+        BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+        writeBacklogQuota.setLimitSize(1024);
+        writeBacklogQuota.setLimitTime(60);
+        writeBacklogQuota.setPolicy(testPolicy);
+        HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
+        quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
+        writePolicy.backlog_quota_map = quotaHashMap;
+        byte[] serialize = simpleType.serialize("/path", writePolicy);
+        Policies policies = simpleType.deserialize("/path", serialize, null);
+        BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        Assert.assertEquals(readBacklogQuota.getLimit(), 1024);
+        Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+        Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+    }
+
+    @Test
+    public void testV28ClientSetV27BrokerRead() {
+        BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+        writeBacklogQuota.setLimitSize(1024);
+        Assert.assertEquals(1024, writeBacklogQuota.getLimit());
+    }
+
     @Test
     public void testBackwardCompatibility() throws IOException {
         String oldPolicyStr = "{\"auth_policies\":{\"namespace_auth\":{},\"destination_auth\":{},"
@@ -41,10 +96,7 @@ public class BacklogQuotaCompatibilityTest {
                 + "\"schema_auto_update_compatibility_strategy\":\"Full\",\"schema_compatibility_strategy\":"
                 + "\"UNDEFINED\",\"is_allow_auto_update_schema\":true,\"schema_validation_enforced\":false,"
                 + "\"subscription_types_enabled\":[]}\n";
-
-        JSONMetadataSerdeSimpleType jsonMetadataSerdeSimpleType = new JSONMetadataSerdeSimpleType(
-                TypeFactory.defaultInstance().constructSimpleType(Policies.class, null));
-        Policies policies = (Policies) jsonMetadataSerdeSimpleType.deserialize(null, oldPolicyStr.getBytes(), null);
+        Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
         assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
                 1001);
         assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),