You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/12 19:53:00 UTC

[incubator-pulsar] branch master updated: Allow to configure BK digest type in broker configuration (#1369)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e543a77  Allow to configure BK digest type in broker configuration (#1369)
e543a77 is described below

commit e543a77c99661f72e9e59c92963202419b8d13ed
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 12 12:52:57 2018 -0700

    Allow to configure BK digest type in broker configuration (#1369)
---
 conf/broker.conf                                   |  4 +++
 conf/standalone.conf                               |  4 +++
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  2 +-
 .../mledger/impl/ManagedLedgerBkTest.java          | 35 +++++++++++++++++++++-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  1 +
 .../apache/pulsar/broker/ServiceConfiguration.java | 30 ++++++++++++++-----
 .../PulsarConfigurationLoaderTest.java             |  3 ++
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  1 +
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../org/apache/pulsar/common/util/FieldParser.java | 13 ++++++++
 11 files changed, 85 insertions(+), 12 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 55e6f19..59a19d7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -285,6 +285,10 @@ managedLedgerDefaultWriteQuorum=2
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=2
 
+# Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+managedLedgerDigestType=CRC32
+
 # Amount of memory to use for caching data payload in managed ledger. This memory
 # is allocated from JVM direct memory and it's shared across all the topics
 # running  in the same broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index de04064..dc6f1b7 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -250,6 +250,10 @@ managedLedgerDefaultWriteQuorum=1
 # Number of guaranteed copies (acks to wait before write is complete)
 managedLedgerDefaultAckQuorum=1
 
+# Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+managedLedgerDigestType=CRC32
+
 # Amount of memory to use for caching data payload in managed ledger. This memory
 # is allocated from JVM direct memory and it's shared across all the topics
 # running  in the same broker
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index bbac80a..ed4ccb3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -51,7 +51,7 @@ public class ManagedLedgerConfig {
     private long retentionSizeInMB = 0;
     private boolean autoSkipNonRecoverableData;
 
-    private DigestType digestType = DigestType.MAC;
+    private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ee49810..312b894 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2515,7 +2515,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         // verify cursor-ledger's last entry has individual-deleted positions
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
-        bkc.asyncOpenLedger(c1.getCursorLedger(), DigestType.MAC, "".getBytes(), (rc, lh, ctx) -> {
+        bkc.asyncOpenLedger(c1.getCursorLedger(), DigestType.CRC32C, "".getBytes(), (rc, lh, ctx) -> {
             if (rc == BKException.Code.OK) {
                 long lastEntry = lh.getLastAddConfirmed();
                 lh.asyncReadEntries(lastEntry, lastEntry, (rc1, lh1, seq, ctx1) -> {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 59d35d6..3035c28 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -345,7 +345,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
 
         // Trigger the closure of the data ledger
-        bkc.openLedger(p1.getLedgerId(), DigestType.MAC, new byte[] {});
+        bkc.openLedger(p1.getLedgerId(), DigestType.CRC32C, new byte[] {});
 
         ledger.addEntry("entry-2".getBytes());
 
@@ -502,4 +502,37 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         factory.shutdown();
     }
 
+    @Test
+    public void testChangeCrcType() throws Exception {
+        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
+        config.setDigestType(DigestType.CRC32);
+        ManagedLedger ledger = factory.open("my_test_ledger", config);
+        ManagedCursor c1 = ledger.openCursor("c1");
+
+        ledger.addEntry("entry-0".getBytes());
+        ledger.addEntry("entry-1".getBytes());
+        ledger.addEntry("entry-2".getBytes());
+
+        ledger.close();
+
+        config.setDigestType(DigestType.CRC32C);
+        ledger = factory.open("my_test_ledger", config);
+        c1 = ledger.openCursor("c1");
+
+        ledger.addEntry("entry-3".getBytes());
+
+        assertEquals(c1.getNumberOfEntries(), 4);
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+
+        List<Entry> entries = c1.readEntries(4);
+        assertEquals(entries.size(), 4);
+        for (int i = 0; i < 4; i++) {
+            assertEquals(new String(entries.get(i).getData()), "entry-" + i);
+        }
+
+        factory.shutdown();
+    }
+
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 685dd92..1680f71 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -139,6 +139,7 @@ public abstract class BookKeeperClusterTestCase {
     protected void startBKCluster() throws Exception {
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
         baseClientConf.setUseV2WireProtocol(true);
+        baseClientConf.setEnableDigestTypeAutodetection(true);
         if (numBookies > 0) {
             bkc = new BookKeeperTestClient(baseClientConf);
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cc2a409..f851f7d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -201,7 +202,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // Specify the tls cipher the broker will use to negotiate during TLS Handshake.
     // Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
     private Set<String> tlsCiphers = Sets.newTreeSet();
-    
+
     /***** --- Authentication --- ****/
     // Enable authentication
     private boolean authenticationEnabled = false;
@@ -222,7 +223,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private Set<String> proxyRoles = Sets.newTreeSet();
 
     // If this flag is set then the broker authenticates the original Auth data
-    // else it just accepts the originalPrincipal and authorizes it (if required). 
+    // else it just accepts the originalPrincipal and authorizes it (if required).
     private boolean authenticateOriginalAuthData = false;
 
     // Allow wildcard matching in authorization
@@ -236,7 +237,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String brokerClientAuthenticationParameters = "";
     // Path for the trusted TLS certificate file for outgoing connection to a server (broker)
     private String brokerClientTrustCertsFilePath = "";
-    
+
     // When this parameter is not empty, unauthenticated users perform as anonymousUserRole
     private String anonymousUserRole = null;
 
@@ -278,6 +279,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // Number of guaranteed copies (acks to wait before write is complete)
     @FieldContext(minValue = 1)
     private int managedLedgerDefaultAckQuorum = 1;
+
+    // Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+    // Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+    private DigestType managedLedgerDigestType = DigestType.CRC32;
+
     // Max number of bookies to use when creating a ledger
     @FieldContext(minValue = 1)
     private int managedLedgerMaxEnsembleSize = 5;
@@ -864,15 +870,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public Set<String> getSuperUserRoles() {
         return superUserRoles;
     }
- 
+
     public Set<String> getProxyRoles() {
         return proxyRoles;
     }
-    
+
     public void setProxyRoles(Set<String> proxyRoles) {
         this.proxyRoles = proxyRoles;
     }
-    
+
     public boolean getAuthorizationAllowWildcardsMatching() {
         return authorizationAllowWildcardsMatching;
     }
@@ -908,7 +914,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public void setBrokerClientTrustCertsFilePath(String brokerClientTrustCertsFilePath) {
         this.brokerClientTrustCertsFilePath = brokerClientTrustCertsFilePath;
     }
-    
+
     public String getAnonymousUserRole() {
         return anonymousUserRole;
     }
@@ -1031,6 +1037,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.managedLedgerDefaultAckQuorum = managedLedgerDefaultAckQuorum;
     }
 
+    public DigestType getManagedLedgerDigestType() {
+        return managedLedgerDigestType;
+    }
+
+    public void setManagedLedgerDigestType(DigestType managedLedgerDigestType) {
+        this.managedLedgerDigestType = managedLedgerDigestType;
+    }
+
     public int getManagedLedgerMaxEnsembleSize() {
         return managedLedgerMaxEnsembleSize;
     }
@@ -1467,7 +1481,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
         this.authenticateOriginalAuthData = authenticateOriginalAuthData;
     }
-    
+
     public Set<String> getTlsProtocols() {
         return tlsProtocols;
     }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 5ff95dc..038fb4d 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -32,6 +32,7 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.util.Properties;
 
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.testng.annotations.Test;
 
@@ -100,6 +101,7 @@ public class PulsarConfigurationLoaderTest {
         printWriter.println("superUserRoles=appid1,appid2");
         printWriter.println("brokerServicePort=7777");
         printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0");
+        printWriter.println("managedLedgerDigestType=CRC32C");
         printWriter.close();
         testConfigFile.deleteOnExit();
         InputStream stream = new FileInputStream(testConfigFile);
@@ -111,6 +113,7 @@ public class PulsarConfigurationLoaderTest {
         assertEquals(serviceConfig.getClusterName(), "usc");
         assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
         assertEquals(serviceConfig.getBrokerServicePort(), 7777);
+        assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
     }
 
     @Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 954d4ef..0677800 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -53,6 +53,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         bkConf.setSpeculativeReadTimeout(conf.getBookkeeperClientSpeculativeReadTimeoutInMillis());
         bkConf.setNumChannelsPerBookie(16);
         bkConf.setUseV2WireProtocol(true);
+        bkConf.setEnableDigestTypeAutodetection(true);
         bkConf.setLedgerManagerFactoryClassName(HierarchicalLedgerManagerFactory.class.getName());
         if (conf.isBookkeeperClientHealthCheckEnabled()) {
             bkConf.enableBookieHealthCheck();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bbdc696..577fab3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -674,7 +674,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
             managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
             managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-            managedLedgerConfig.setDigestType(DigestType.CRC32);
+            managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
 
             managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
             managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index f6f4fc1..e476fc5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.common.util;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
 
+import com.fasterxml.jackson.databind.util.EnumResolver;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
@@ -90,6 +92,17 @@ public final class FieldParser {
         // Lookup the suitable converter.
         String converterId = from.getClass().getName() + "_" + to.getName();
         Method converter = CONVERTERS.get(converterId);
+
+        if (to.isEnum()) {
+            // Converting string to enum
+            EnumResolver r = EnumResolver.constructUsingToString((Class<Enum<?>>) to, null);
+            T value = (T) r.findEnum((String) from);
+            if (value == null) {
+                throw new RuntimeException("Invalid value '" + from + "' for enum " + to);
+            }
+            return value;
+        }
+
         if (converter == null) {
             throw new UnsupportedOperationException("Cannot convert from " + from.getClass().getName() + " to "
                     + to.getName() + ". Requested converter does not exist.");

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.