You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/12 19:58:58 UTC
[pulsar] branch master updated: Avoid introducing bookkeeper-common
into the pulsar-common (#9551)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 18e61b3 Avoid introducing bookkeeper-common into the pulsar-common (#9551)
18e61b3 is described below
commit 18e61b3989df66c6789574f72527144ff5fda25e
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Sat Feb 13 03:58:25 2021 +0800
Avoid introducing bookkeeper-common into the pulsar-common (#9551)
* Avoid introduce bookkeeper-common into the pulsar-common
---
*Motivation*
Direct using jackson to parse json to avoid introduce the bookkeeper-common
into the pulsar-common. That will make other modules which are using pulsar-common
has some unused bookkeeper dependencies.
* Fix the build and add some tests
* Address comments
---
.../mledger/impl/LedgerMetadataUtils.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 +-
pulsar-common/pom.xml | 5 --
.../data/EnsemblePlacementPolicyConfig.java | 29 ++++++++++--
.../data/EnsemblePlacementPolicyConfigTest.java | 54 ++++++++++++++++++++++
.../ZkIsolatedBookieEnsemblePlacementPolicy.java | 3 +-
6 files changed, 87 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 1f59603..5edef0f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
@@ -116,7 +117,8 @@ public final class LedgerMetadataUtils {
* placement policy configuration encode error
*/
static Map<String, byte[]> buildMetadataForPlacementPolicyConfig(
- Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties) throws ParseJsonException {
+ Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties)
+ throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties);
return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bda3f97..0e5ba68 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
@@ -124,6 +126,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
@@ -3495,7 +3498,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties()
));
- } catch (JsonUtil.ParseJsonException e) {
+ } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("[{}] Serialize the placement configuration failed", name, e);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
return;
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index b15470f..1abf107 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -90,11 +90,6 @@
</dependency>
<dependency>
- <groupId>org.apache.bookkeeper</groupId>
- <artifactId>bookkeeper-common</artifactId>
- </dependency>
-
- <dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
index 2c42f14..64a0587 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfig.java
@@ -18,8 +18,9 @@
*/
package org.apache.pulsar.common.policies.data;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Objects;
-import org.apache.bookkeeper.common.util.JsonUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -66,11 +67,29 @@ public class EnsemblePlacementPolicyConfig {
return false;
}
- public byte[] encode() throws JsonUtil.ParseJsonException {
- return JsonUtil.toJson(this).getBytes(StandardCharsets.UTF_8);
+ public byte[] encode() throws ParseEnsemblePlacementPolicyConfigException {
+ try {
+ return ObjectMapperFactory.getThreadLocal()
+ .writerWithDefaultPrettyPrinter()
+ .writeValueAsString(this)
+ .getBytes(StandardCharsets.UTF_8);
+ } catch (JsonProcessingException e) {
+ throw new ParseEnsemblePlacementPolicyConfigException("Failed to encode to json", e);
+ }
}
- public static EnsemblePlacementPolicyConfig decode(byte[] data) throws JsonUtil.ParseJsonException {
- return JsonUtil.fromJson(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
+ public static EnsemblePlacementPolicyConfig decode(byte[] data) throws ParseEnsemblePlacementPolicyConfigException {
+ try {
+ return ObjectMapperFactory.getThreadLocal()
+ .readValue(new String(data, StandardCharsets.UTF_8), EnsemblePlacementPolicyConfig.class);
+ } catch (JsonProcessingException e) {
+ throw new ParseEnsemblePlacementPolicyConfigException("Failed to decode from json", e);
+ }
+ }
+
+ public static class ParseEnsemblePlacementPolicyConfigException extends Exception {
+ ParseEnsemblePlacementPolicyConfigException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
}
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java
new file mode 100644
index 0000000..695b5b5
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/EnsemblePlacementPolicyConfigTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+public class EnsemblePlacementPolicyConfigTest {
+
+ static class MockedEnsemblePlacementPolicy {}
+
+ @Test
+ public void testEncodeDecodeSuccessfully()
+ throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
+
+ EnsemblePlacementPolicyConfig originalConfig =
+ new EnsemblePlacementPolicyConfig(MockedEnsemblePlacementPolicy.class, Collections.EMPTY_MAP);
+ byte[] encodedConfig = originalConfig.encode();
+
+ EnsemblePlacementPolicyConfig decodedConfig =
+ EnsemblePlacementPolicyConfig.decode(encodedConfig);
+ Assert.assertEquals(decodedConfig, originalConfig);
+ }
+
+ @Test
+ public void testDecodeFailed() {
+ byte[] configBytes = new byte[0];
+ try {
+ EnsemblePlacementPolicyConfig.decode(configBytes);
+ Assert.fail("should failed parse the config from bytes");
+ } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
+ // expected error
+ }
+ }
+}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 5cca446..e7f393a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -193,7 +194,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
if (ensemblePlacementPolicyConfigData != null) {
try {
return Optional.ofNullable(EnsemblePlacementPolicyConfig.decode(ensemblePlacementPolicyConfigData));
- } catch (JsonUtil.ParseJsonException e) {
+ } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
LOG.error("Failed to parse the ensemble placement policy config from the custom metadata", e);
return Optional.empty();
}