You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/29 03:53:46 UTC
[incubator-pulsar] branch master updated: Avoid boxing of checksum
into a Long (#1467)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e035b0 Avoid boxing of checksum into a Long (#1467)
7e035b0 is described below
commit 7e035b0bd5c1fcbe6cb583ae11e6674e0bddc4f0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 28 20:53:44 2018 -0700
Avoid boxing of checksum into a Long (#1467)
---
.../org/apache/pulsar/broker/service/Consumer.java | 5 ++---
.../org/apache/pulsar/broker/service/Producer.java | 2 +-
.../pulsar/broker/service/ReplicatorTest.java | 2 +-
.../broker/service/persistent/ChecksumTest.java | 4 ++--
.../org/apache/pulsar/compaction/CompactorTest.java | 2 +-
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/ProducerImpl.java | 2 +-
.../java/org/apache/pulsar/common/api/Commands.java | 21 ++++++++++++++-------
.../pulsar/common/compression/CommandsTest.java | 2 +-
9 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4c5dbbc..12782c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.pulsar.common.api.Commands.readChecksum;
import java.util.Collections;
import java.util.Iterator;
@@ -37,7 +36,7 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
@@ -225,7 +224,7 @@ public class Consumer {
metadataAndPayload.retain();
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
- readChecksum(metadataAndPayload);
+ Commands.skipChecksumIfPresent(metadataAndPayload);
}
if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 35bc315..2088035 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -176,7 +176,7 @@ public class Producer {
int readerIndex = headersAndPayload.readerIndex();
try {
- int checksum = readChecksum(headersAndPayload).intValue();
+ int checksum = readChecksum(headersAndPayload);
long computedChecksum = computeChecksum(headersAndPayload);
if (checksum == computedChecksum) {
return true;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 08e4fb7..09c807f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -861,7 +861,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
ByteBuf b = msg.getHeadersAndPayload();
assertTrue(Commands.hasChecksum(b));
- int parsedChecksum = Commands.readChecksum(b).intValue();
+ int parsedChecksum = Commands.readChecksum(b);
int computedChecksum = Crc32cIntChecksum.computeChecksum(b);
assertEquals(parsedChecksum, computedChecksum);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
index 3e6b3bd..94c3762 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
@@ -74,7 +74,7 @@ public class ChecksumTest extends BrokerTestBase {
ByteBuf b = entries.get(0).getDataBuffer();
assertTrue(Commands.hasChecksum(b));
- int parsedChecksum = Commands.readChecksum(b).intValue();
+ int parsedChecksum = Commands.readChecksum(b);
int computedChecksum = Crc32cIntChecksum.computeChecksum(b);
assertEquals(parsedChecksum, computedChecksum);
@@ -95,7 +95,7 @@ public class ChecksumTest extends BrokerTestBase {
ByteBuf b = msg.getHeadersAndPayload();
assertTrue(Commands.hasChecksum(b));
- int parsedChecksum = Commands.readChecksum(b).intValue();
+ int parsedChecksum = Commands.readChecksum(b);
int computedChecksum = Crc32cIntChecksum.computeChecksum(b);
assertEquals(parsedChecksum, computedChecksum);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 1867ff4..59c46ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -207,7 +207,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
- Commands.readChecksum(payloadAndMetadata);
+ Commands.skipChecksumIfPresent(payloadAndMetadata);
int metadataSize = payloadAndMetadata.readInt(); // metadata size
byte[] metadata = new byte[metadataSize];
payloadAndMetadata.readBytes(metadata);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c215431..bc62d3a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1026,7 +1026,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
if (hasChecksum(headersAndPayload)) {
- int checksum = readChecksum(headersAndPayload).intValue();
+ int checksum = readChecksum(headersAndPayload);
int computedChecksum = computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e1a505a..27757d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -733,7 +733,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
headerFrame.skipBytes(cmdSize);
// verify if checksum present
if (hasChecksum(headerFrame)) {
- int checksum = readChecksum(headerFrame).intValue();
+ int checksum = readChecksum(headerFrame);
// msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload
int metadataChecksum = computeChecksum(headerFrame);
long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index dcea2b2..f5470cc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -236,12 +236,19 @@ public class Commands {
return buffer.getShort(buffer.readerIndex()) == magicCrc32c;
}
- public static Long readChecksum(ByteBuf buffer) {
- if(hasChecksum(buffer)) {
- buffer.skipBytes(2); //skip magic bytes
- return buffer.readUnsignedInt();
- } else{
- return null;
+ /**
+ * Read the checksum and advance the reader index in the buffer.
+ *
+ * Note: This method assume the checksum presence was already verified before.
+ */
+ public static int readChecksum(ByteBuf buffer) {
+ buffer.skipBytes(2); //skip magic bytes
+ return buffer.readInt();
+ }
+
+ public static void skipChecksumIfPresent(ByteBuf buffer) {
+ if (hasChecksum(buffer)) {
+ readChecksum(buffer);
}
}
@@ -249,7 +256,7 @@ public class Commands {
try {
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse
// metadata
- readChecksum(buffer);
+ skipChecksumIfPresent(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
int writerIndex = buffer.writerIndex();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 1827416..186e7b2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -58,7 +58,7 @@ public class CommandsTest {
/*** 1. verify checksum and metadataParsing ***/
boolean hasChecksum = Commands.hasChecksum(receivedBuf);
- int checksum = Commands.readChecksum(receivedBuf).intValue();
+ int checksum = Commands.readChecksum(receivedBuf);
// verify checksum is present
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.