You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/12/15 11:19:47 UTC
[pulsar] branch branch-2.9 updated: Fix incompatibility of BacklogQuota (#13291)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4e4c787 Fix incompatibility of BacklogQuota (#13291)
4e4c787 is described below
commit 4e4c7877d1ef0add6727cd1b58063222d152ed71
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Wed Dec 15 19:16:15 2021 +0800
Fix incompatibility of BacklogQuota (#13291)
(cherry picked from commit 4e89c2bd93bab682d288cf8d069fd77a2c9cb1e4)
---
.../pulsar/common/policies/data/BacklogQuota.java | 9 +++
.../policies/data/impl/BacklogQuotaImpl.java | 77 ++++++++++++++++++++--
.../common/policies/data/BacklogQuotaMixIn.java | 26 --------
.../pulsar/common/util/ObjectMapperFactory.java | 2 -
.../common/util/ObjectMapperFactoryTest.java | 27 --------
.../metadata/BacklogQuotaCompatibilityTest.java | 60 +++++++++++++++--
6 files changed, 135 insertions(+), 66 deletions(-)
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
index d4b5c4b..4604710 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuota.java
@@ -30,6 +30,15 @@ public interface BacklogQuota {
/**
* Gets quota limit in size.
+ * Remains for compatible
+ *
+ * @return quota limit in bytes
+ */
+ @Deprecated
+ long getLimit();
+
+ /**
+ * Gets quota limit in size.
*
* @return quota limit in bytes
*/
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
index 591e8b8..3d97fa0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java
@@ -18,23 +18,86 @@
*/
package org.apache.pulsar.common.policies.data.impl;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.ToString;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-@Data
-@AllArgsConstructor
+@ToString
+@EqualsAndHashCode
@NoArgsConstructor
public class BacklogQuotaImpl implements BacklogQuota {
public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024;
- // backlog quota by size in byte
- private long limitSize;
- // backlog quota by time in second
+ /**
+ * backlog quota by size in byte, remains for compatible.
+ * for the details: https://github.com/apache/pulsar/pull/13291
+ * @since 2.9.1
+ */
+ @Deprecated
+ private Long limit;
+
+ /**
+ * backlog quota by size in byte.
+ */
+ private Long limitSize;
+
+ /**
+ * backlog quota by time in second.
+ */
private int limitTime;
private RetentionPolicy policy;
+ public BacklogQuotaImpl(long limitSize, int limitTime, RetentionPolicy policy) {
+ this.limitSize = limitSize;
+ this.limitTime = limitTime;
+ this.policy = policy;
+ }
+
+ @Deprecated
+ public long getLimit() {
+ if (limitSize == null) {
+ // the limitSize and limit can't be both null
+ return limit;
+ }
+ return limitSize;
+ }
+
+ @Deprecated
+ public void setLimit(long limit) {
+ this.limit = limit;
+ this.limitSize = limit;
+ }
+
+ public long getLimitSize() {
+ if (limitSize == null) {
+ // the limitSize and limit can't be both null
+ return limit;
+ }
+ return limitSize;
+ }
+
+ public void setLimitSize(long limitSize) {
+ this.limitSize = limitSize;
+ this.limit = limitSize;
+ }
+
+ public int getLimitTime() {
+ return limitTime;
+ }
+
+ public void setLimitTime(int limitTime) {
+ this.limitTime = limitTime;
+ }
+
+ public RetentionPolicy getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(RetentionPolicy policy) {
+ this.policy = policy;
+ }
+
public static BacklogQuotaImplBuilder builder() {
return new BacklogQuotaImplBuilder();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
deleted file mode 100644
index a156240..0000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BacklogQuotaMixIn.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.policies.data;
-
-import com.fasterxml.jackson.annotation.JsonAlias;
-
-public abstract class BacklogQuotaMixIn {
- @JsonAlias("limit")
- private long limitSize;
-}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 94e1b7a..ef2e489 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BacklogQuotaMixIn;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
@@ -192,7 +191,6 @@ public class ObjectMapperFactory {
resolver.addMapping(AutoSubscriptionCreationOverride.class, AutoSubscriptionCreationOverrideImpl.class);
// we use MixIn class to add jackson annotations
- mapper.addMixIn(BacklogQuotaImpl.class, BacklogQuotaMixIn.class);
mapper.addMixIn(ResourceQuota.class, ResourceQuotaMixIn.class);
mapper.addMixIn(FunctionConfig.class, JsonIgnorePropertiesMixIn.class);
mapper.addMixIn(FunctionState.class, JsonIgnorePropertiesMixIn.class);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
index dc8aa8b..f9e3729 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ObjectMapperFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.common.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.ToString;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.stats.Metrics;
@@ -28,32 +27,6 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class ObjectMapperFactoryTest {
- @Test
- public void testBacklogQuotaMixIn() {
- ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
- String json = "{\"limit\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
- try {
- BacklogQuota backlogQuota = objectMapper.readValue(json, BacklogQuota.class);
- Assert.assertEquals(backlogQuota.getLimitSize(), 10);
- Assert.assertEquals(backlogQuota.getLimitTime(), 0);
- Assert.assertEquals(backlogQuota.getPolicy(), BacklogQuota.RetentionPolicy.producer_request_hold);
- } catch (Exception ex) {
- Assert.fail("shouldn't have thrown exception", ex);
- }
-
- try {
- String expectJson = "{\"limitSize\":10,\"limitTime\":0,\"policy\":\"producer_request_hold\"}";
- BacklogQuota backlogQuota = BacklogQuota.builder()
- .limitSize(10)
- .limitTime(0)
- .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold)
- .build();
- String writeJson = objectMapper.writeValueAsString(backlogQuota);
- Assert.assertEquals(expectJson, writeJson);
- } catch (Exception ex) {
- Assert.fail("shouldn't have thrown exception", ex);
- }
- }
@Test
public void testResourceQuotaMixIn() {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
index 06765e3..659825c 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BacklogQuotaCompatibilityTest.java
@@ -19,15 +19,70 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
+
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import java.io.IOException;
+import java.util.HashMap;
+
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class BacklogQuotaCompatibilityTest {
+ private final JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(Policies.class, null);
+
+ private final JSONMetadataSerdeSimpleType<Policies> simpleType = new JSONMetadataSerdeSimpleType<>(typeRef);
+
+ private final BacklogQuota.RetentionPolicy testPolicy = BacklogQuota.RetentionPolicy.consumer_backlog_eviction;
+
+ @Test
+ public void testV27ClientSetV28BrokerRead() throws Exception {
+ Policies writePolicy = new Policies();
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimit(1024);
+ writeBacklogQuota.setLimitTime(60);
+ writeBacklogQuota.setPolicy(testPolicy);
+ HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
+ quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
+ writePolicy.backlog_quota_map = quotaHashMap;
+ byte[] serialize = simpleType.serialize("/path", writePolicy);
+ Policies policies = simpleType.deserialize("/path", serialize, null);
+ BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+ Assert.assertEquals(readBacklogQuota.getLimitSize(), 1024);
+ Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+ Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+ }
+
+ @Test
+ public void testV28ClientSetV28BrokerRead() throws Exception {
+ Policies writePolicy = new Policies();
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimitSize(1024);
+ writeBacklogQuota.setLimitTime(60);
+ writeBacklogQuota.setPolicy(testPolicy);
+ HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> quotaHashMap = new HashMap<>();
+ quotaHashMap.put(BacklogQuota.BacklogQuotaType.destination_storage, writeBacklogQuota);
+ writePolicy.backlog_quota_map = quotaHashMap;
+ byte[] serialize = simpleType.serialize("/path", writePolicy);
+ Policies policies = simpleType.deserialize("/path", serialize, null);
+ BacklogQuota readBacklogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+ Assert.assertEquals(readBacklogQuota.getLimit(), 1024);
+ Assert.assertEquals(readBacklogQuota.getLimitTime(), 60);
+ Assert.assertEquals(readBacklogQuota.getPolicy(), testPolicy);
+ }
+
+ @Test
+ public void testV28ClientSetV27BrokerRead() {
+ BacklogQuotaImpl writeBacklogQuota = new BacklogQuotaImpl();
+ writeBacklogQuota.setLimitSize(1024);
+ Assert.assertEquals(1024, writeBacklogQuota.getLimit());
+ }
+
@Test
public void testBackwardCompatibility() throws IOException {
String oldPolicyStr = "{\"auth_policies\":{\"namespace_auth\":{},\"destination_auth\":{},"
@@ -41,10 +96,7 @@ public class BacklogQuotaCompatibilityTest {
+ "\"schema_auto_update_compatibility_strategy\":\"Full\",\"schema_compatibility_strategy\":"
+ "\"UNDEFINED\",\"is_allow_auto_update_schema\":true,\"schema_validation_enforced\":false,"
+ "\"subscription_types_enabled\":[]}\n";
-
- JSONMetadataSerdeSimpleType jsonMetadataSerdeSimpleType = new JSONMetadataSerdeSimpleType(
- TypeFactory.defaultInstance().constructSimpleType(Policies.class, null));
- Policies policies = (Policies) jsonMetadataSerdeSimpleType.deserialize(null, oldPolicyStr.getBytes(), null);
+ Policies policies = simpleType.deserialize(null, oldPolicyStr.getBytes(), null);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
1001);
assertEquals(policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage).getLimitTime(),