You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/12 19:53:00 UTC
[incubator-pulsar] branch master updated: Allow to configure BK
digest type in broker configuration (#1369)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e543a77 Allow to configure BK digest type in broker configuration (#1369)
e543a77 is described below
commit e543a77c99661f72e9e59c92963202419b8d13ed
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 12 12:52:57 2018 -0700
Allow to configure BK digest type in broker configuration (#1369)
---
conf/broker.conf | 4 +++
conf/standalone.conf | 4 +++
.../bookkeeper/mledger/ManagedLedgerConfig.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 2 +-
.../mledger/impl/ManagedLedgerBkTest.java | 35 +++++++++++++++++++++-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 1 +
.../apache/pulsar/broker/ServiceConfiguration.java | 30 ++++++++++++++-----
.../PulsarConfigurationLoaderTest.java | 3 ++
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 1 +
.../pulsar/broker/service/BrokerService.java | 2 +-
.../org/apache/pulsar/common/util/FieldParser.java | 13 ++++++++
11 files changed, 85 insertions(+), 12 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 55e6f19..59a19d7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -285,6 +285,10 @@ managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2
+# Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+managedLedgerDigestType=CRC32
+
# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index de04064..dc6f1b7 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -250,6 +250,10 @@ managedLedgerDefaultWriteQuorum=1
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=1
+# Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+managedLedgerDigestType=CRC32
+
# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index bbac80a..ed4ccb3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -51,7 +51,7 @@ public class ManagedLedgerConfig {
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
- private DigestType digestType = DigestType.MAC;
+ private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
/**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ee49810..312b894 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2515,7 +2515,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
// verify cursor-ledger's last entry has individual-deleted positions
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
- bkc.asyncOpenLedger(c1.getCursorLedger(), DigestType.MAC, "".getBytes(), (rc, lh, ctx) -> {
+ bkc.asyncOpenLedger(c1.getCursorLedger(), DigestType.CRC32C, "".getBytes(), (rc, lh, ctx) -> {
if (rc == BKException.Code.OK) {
long lastEntry = lh.getLastAddConfirmed();
lh.asyncReadEntries(lastEntry, lastEntry, (rc1, lh1, seq, ctx1) -> {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 59d35d6..3035c28 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -345,7 +345,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
// Trigger the closure of the data ledger
- bkc.openLedger(p1.getLedgerId(), DigestType.MAC, new byte[] {});
+ bkc.openLedger(p1.getLedgerId(), DigestType.CRC32C, new byte[] {});
ledger.addEntry("entry-2".getBytes());
@@ -502,4 +502,37 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
factory.shutdown();
}
+ @Test
+ public void testChangeCrcType() throws Exception {
+ ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
+ config.setDigestType(DigestType.CRC32);
+ ManagedLedger ledger = factory.open("my_test_ledger", config);
+ ManagedCursor c1 = ledger.openCursor("c1");
+
+ ledger.addEntry("entry-0".getBytes());
+ ledger.addEntry("entry-1".getBytes());
+ ledger.addEntry("entry-2".getBytes());
+
+ ledger.close();
+
+ config.setDigestType(DigestType.CRC32C);
+ ledger = factory.open("my_test_ledger", config);
+ c1 = ledger.openCursor("c1");
+
+ ledger.addEntry("entry-3".getBytes());
+
+ assertEquals(c1.getNumberOfEntries(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+
+ List<Entry> entries = c1.readEntries(4);
+ assertEquals(entries.size(), 4);
+ for (int i = 0; i < 4; i++) {
+ assertEquals(new String(entries.get(i).getData()), "entry-" + i);
+ }
+
+ factory.shutdown();
+ }
+
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 685dd92..1680f71 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -139,6 +139,7 @@ public abstract class BookKeeperClusterTestCase {
protected void startBKCluster() throws Exception {
baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
baseClientConf.setUseV2WireProtocol(true);
+ baseClientConf.setEnableDigestTypeAutodetection(true);
if (numBookies > 0) {
bkc = new BookKeeperTestClient(baseClientConf);
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cc2a409..f851f7d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -201,7 +202,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Specify the tls cipher the broker will use to negotiate during TLS Handshake.
// Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
private Set<String> tlsCiphers = Sets.newTreeSet();
-
+
/***** --- Authentication --- ****/
// Enable authentication
private boolean authenticationEnabled = false;
@@ -222,7 +223,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private Set<String> proxyRoles = Sets.newTreeSet();
// If this flag is set then the broker authenticates the original Auth data
- // else it just accepts the originalPrincipal and authorizes it (if required).
+ // else it just accepts the originalPrincipal and authorizes it (if required).
private boolean authenticateOriginalAuthData = false;
// Allow wildcard matching in authorization
@@ -236,7 +237,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String brokerClientAuthenticationParameters = "";
// Path for the trusted TLS certificate file for outgoing connection to a server (broker)
private String brokerClientTrustCertsFilePath = "";
-
+
// When this parameter is not empty, unauthenticated users perform as anonymousUserRole
private String anonymousUserRole = null;
@@ -278,6 +279,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Number of guaranteed copies (acks to wait before write is complete)
@FieldContext(minValue = 1)
private int managedLedgerDefaultAckQuorum = 1;
+
+ // Default type of checksum to use when writing to BookKeeper. Default is "CRC32"
+ // Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no checksum).
+ private DigestType managedLedgerDigestType = DigestType.CRC32;
+
// Max number of bookies to use when creating a ledger
@FieldContext(minValue = 1)
private int managedLedgerMaxEnsembleSize = 5;
@@ -864,15 +870,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
public Set<String> getSuperUserRoles() {
return superUserRoles;
}
-
+
public Set<String> getProxyRoles() {
return proxyRoles;
}
-
+
public void setProxyRoles(Set<String> proxyRoles) {
this.proxyRoles = proxyRoles;
}
-
+
public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
@@ -908,7 +914,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
public void setBrokerClientTrustCertsFilePath(String brokerClientTrustCertsFilePath) {
this.brokerClientTrustCertsFilePath = brokerClientTrustCertsFilePath;
}
-
+
public String getAnonymousUserRole() {
return anonymousUserRole;
}
@@ -1031,6 +1037,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.managedLedgerDefaultAckQuorum = managedLedgerDefaultAckQuorum;
}
+ public DigestType getManagedLedgerDigestType() {
+ return managedLedgerDigestType;
+ }
+
+ public void setManagedLedgerDigestType(DigestType managedLedgerDigestType) {
+ this.managedLedgerDigestType = managedLedgerDigestType;
+ }
+
public int getManagedLedgerMaxEnsembleSize() {
return managedLedgerMaxEnsembleSize;
}
@@ -1467,7 +1481,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
this.authenticateOriginalAuthData = authenticateOriginalAuthData;
}
-
+
public Set<String> getTlsProtocols() {
return tlsProtocols;
}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 5ff95dc..038fb4d 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -32,6 +32,7 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Properties;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.testng.annotations.Test;
@@ -100,6 +101,7 @@ public class PulsarConfigurationLoaderTest {
printWriter.println("superUserRoles=appid1,appid2");
printWriter.println("brokerServicePort=7777");
printWriter.println("managedLedgerDefaultMarkDeleteRateLimit=5.0");
+ printWriter.println("managedLedgerDigestType=CRC32C");
printWriter.close();
testConfigFile.deleteOnExit();
InputStream stream = new FileInputStream(testConfigFile);
@@ -111,6 +113,7 @@ public class PulsarConfigurationLoaderTest {
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort(), 7777);
+ assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
}
@Test
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 954d4ef..0677800 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -53,6 +53,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setSpeculativeReadTimeout(conf.getBookkeeperClientSpeculativeReadTimeoutInMillis());
bkConf.setNumChannelsPerBookie(16);
bkConf.setUseV2WireProtocol(true);
+ bkConf.setEnableDigestTypeAutodetection(true);
bkConf.setLedgerManagerFactoryClassName(HierarchicalLedgerManagerFactory.class.getName());
if (conf.isBookkeeperClientHealthCheckEnabled()) {
bkConf.enableBookieHealthCheck();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bbdc696..577fab3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -674,7 +674,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
- managedLedgerConfig.setDigestType(DigestType.CRC32);
+ managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
index f6f4fc1..e476fc5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.common.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
+import com.fasterxml.jackson.databind.util.EnumResolver;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
@@ -90,6 +92,17 @@ public final class FieldParser {
// Lookup the suitable converter.
String converterId = from.getClass().getName() + "_" + to.getName();
Method converter = CONVERTERS.get(converterId);
+
+ if (to.isEnum()) {
+ // Converting string to enum
+ EnumResolver r = EnumResolver.constructUsingToString((Class<Enum<?>>) to, null);
+ T value = (T) r.findEnum((String) from);
+ if (value == null) {
+ throw new RuntimeException("Invalid value '" + from + "' for enum " + to);
+ }
+ return value;
+ }
+
if (converter == null) {
throw new UnsupportedOperationException("Cannot convert from " + from.getClass().getName() + " to "
+ to.getName() + ". Requested converter does not exist.");
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.