You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/12 19:58:58 UTC

[pulsar] branch master updated: Avoid introducing bookkeeper-common into the pulsar-common (#9551)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 18e61b3  Avoid introducing bookkeeper-common into the pulsar-common (#9551)
18e61b3 is described below

commit 18e61b3989df66c6789574f72527144ff5fda25e
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sat Feb 13 03:58:25 2021 +0800

    Avoid introducing bookkeeper-common into the pulsar-common (#9551)
    
    * Avoid introduce bookkeeper-common into the pulsar-common
    ---
    
    *Motivation*
    
    Direct using jackson to parse json to avoid introduce the bookkeeper-common
    into the pulsar-common. That will make other modules which are using pulsar-common
    has some unused bookkeeper dependencies.
    
    * Fix the build and add some tests
    
    * Address comments
---
 .../mledger/impl/LedgerMetadataUtils.java          |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  5 +-
 pulsar-common/pom.xml                              |  5 --
 .../data/EnsemblePlacementPolicyConfig.java        | 29 ++++++++++--
 .../data/EnsemblePlacementPolicyConfigTest.java    | 54 ++++++++++++++++++++++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   |  3 +-
 6 files changed, 87 insertions(+), 13 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 1f59603..5edef0f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
@@ -116,7 +117,8 @@ public final class LedgerMetadataUtils {
      *          placement policy configuration encode error
      */
     static Map<String, byte[]> buildMetadataForPlacementPolicyConfig(
-        Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException {
+        Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties)
+        throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
         EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties);
         return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bda3f97..0e5ba68 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
@@ -124,6 +126,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.Stat;
@@ -3495,7 +3498,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     config.getBookKeeperEnsemblePlacementPolicyClassName(),
                     config.getBookKeeperEnsemblePlacementPolicyProperties()
                 ));
-            } catch (JsonUtil.ParseJsonException e) {
+            } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
                 cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
                 return;
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index b15470f..1abf107 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -90,11 +90,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>bookkeeper-common</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>aircompressor</artifactId>
     </dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
index 2c42f14..64a0587 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
@@ -18,8 +18,9 @@
  */
 package org.apache.pulsar.common.policies.data;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Objects;
-import org.apache.bookkeeper.common.util.JsonUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -66,11 +67,29 @@ public class EnsemblePlacementPolicyConfig {
         return false;
     }
 
-    public byte[] encode() throws JsonUtil.ParseJsonException {
-        return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8);
+    public byte[] encode() throws ParseEnsemblePlacementPolicyConfigException {
+        try {
+            return ObjectMapperFactory.getThreadLocal()
+                .writerWithDefaultPrettyPrinter()
+                .writeValueAsString(this)
+                .getBytes(StandardCharsets.UTF_8);
+        } catch (JsonProcessingException e) {
+            throw new ParseEnsemblePlacementPolicyConfigException("Failed to encode to json", e);
+        }
     }
 
-    public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException {
-        return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
+    public static EnsemblePlacementPolicyConfig decode(byte[] data) throws ParseEnsemblePlacementPolicyConfigException {
+        try {
+            return ObjectMapperFactory.getThreadLocal()
+                .readValue(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
+        } catch (JsonProcessingException e) {
+            throw new ParseEnsemblePlacementPolicyConfigException("Failed to decode from json", e);
+        }
+    }
+
+    public static class ParseEnsemblePlacementPolicyConfigException extends Exception {
+        ParseEnsemblePlacementPolicyConfigException(String message, Throwable throwable) {
+            super(message, throwable);
+        }
     }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java
new file mode 100644
index 0000000..695b5b5
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+public class EnsemblePlacementPolicyConfigTest {
+
+    static class MockedEnsemblePlacementPolicy {}
+
+    @Test
+    public void testEncodeDecodeSuccessfully()
+        throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
+
+        EnsemblePlacementPolicyConfig originalConfig =
+            new EnsemblePlacementPolicyConfig(MockedEnsemblePlacementPolicy.class, Collections.EMPTY_MAP);
+        byte[] encodedConfig = originalConfig.encode();
+
+        EnsemblePlacementPolicyConfig decodedConfig =
+            EnsemblePlacementPolicyConfig.decode(encodedConfig);
+        Assert.assertEquals(decodedConfig, originalConfig);
+    }
+
+    @Test
+    public void testDecodeFailed() {
+        byte[] configBytes = new byte[0];
+        try {
+            EnsemblePlacementPolicyConfig.decode(configBytes);
+            Assert.fail("should failed parse the config from bytes");
+        } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
+            // expected error
+        }
+    }
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 5cca446..e7f393a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -193,7 +194,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
         if (ensemblePlacementPolicyConfigData != null) {
             try {
                 return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData));
-            } catch (JsonUtil.ParseJsonException e) {
+            } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e);
                 return Optional.empty();
             }