You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/08 22:04:42 UTC

[incubator-pulsar] branch master updated: Ensure the checksum is not stripped after validation in the broker (#1195)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 552249d  Ensure the checksum is not stripped after validation in the broker (#1195)
552249d is described below

commit 552249d9b79411b0972967156b3c99a8bbb858c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 8 14:04:37 2018 -0800

    Ensure the checksum is not stripped after validation in the broker (#1195)
    
    * Ensure the checksum is not stripped after validation in the broker
    
    * Fixed issue in C++ consumer to adjust size after reading the checksum
    
    * Fixed formatting
    
    * Added missing client protocol version check
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   4 +-
 .../pulsar/broker/service/ReplicatorTest.java      |  31 ++++++
 .../broker/service/persistent/ChecksumTest.java    | 105 +++++++++++++++++++++
 pulsar-client-cpp/lib/ClientConnection.cc          |  14 ++-
 pulsar-client-cpp/lib/ClientConnection.h           |   3 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |   3 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 8 files changed, 155 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index cf796d3..77c36e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -202,7 +202,7 @@ public class Consumer {
                 // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
                 metadataAndPayload.retain();
                 // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
-                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
+                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
                     readChecksum(metadataAndPayload);
                 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 72b12d6..e964598 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -168,11 +168,11 @@ public class Producer {
     }
 
     private boolean verifyChecksum(ByteBuf headersAndPayload) {
-
         if (hasChecksum(headersAndPayload)) {
-            int checksum = readChecksum(headersAndPayload).intValue();
             int readerIndex = headersAndPayload.readerIndex();
+
             try {
+                int checksum = readChecksum(headersAndPayload).intValue();
                 long computedChecksum = computeChecksum(headersAndPayload);
                 if (checksum == computedChecksum) {
                     return true;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index f1394d1..cbef1d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -50,13 +50,17 @@ import org.apache.pulsar.broker.namespace.OwnedBundle;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.checksum.utils.Crc32cChecksum;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -73,6 +77,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Starts 2 brokers that are in 2 different clusters
  */
@@ -822,7 +828,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
         producer1.close();
         consumer1.close();
         consumer2.close();
+    }
+
+    @Test(timeOut = 30000)
+    public void verifyChecksumAfterReplication() throws Exception {
+        final String topicName = "persistent://pulsar/global/ns/checksumAfterReplication";
+
+        PulsarClient c1 = PulsarClient.create(url1.toString());
+        Producer p1 = c1.createProducer(topicName);
+
+        PulsarClient c2 = PulsarClient.create(url2.toString());
+        RawReader reader2 = RawReader.create(c2, topicName, "sub").get();
+
+        p1.send("Hello".getBytes());
+
+        RawMessage msg = reader2.readNextAsync().get();
+
+        ByteBuf b = msg.getHeadersAndPayload();
+
+        assertTrue(Commands.hasChecksum(b));
+        int parsedChecksum = Commands.readChecksum(b).intValue();
+        int computedChecksum = Crc32cChecksum.computeChecksum(b);
+
+        assertEquals(parsedChecksum, computedChecksum);
 
+        p1.close();
+        reader2.closeAsync().get();
     }
 
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
new file mode 100644
index 0000000..0d23ab2
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.checksum.utils.Crc32cChecksum;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.common.api.Commands;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ */
+@Test
+public class ChecksumTest extends BrokerTestBase {
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void verifyChecksumStoredInManagedLedger() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/topic0";
+
+        Producer producer = pulsarClient.createProducer(topicName);
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
+
+        ManagedLedger ledger = topic.getManagedLedger();
+        ManagedCursor cursor = ledger.openCursor("test");
+
+        producer.send("Hello".getBytes());
+
+        List<Entry> entries = cursor.readEntriesOrWait(1);
+        assertEquals(entries.size(), 1);
+
+        ByteBuf b = entries.get(0).getDataBuffer();
+
+        assertTrue(Commands.hasChecksum(b));
+        int parsedChecksum = Commands.readChecksum(b).intValue();
+        int computedChecksum = Crc32cChecksum.computeChecksum(b);
+        assertEquals(parsedChecksum, computedChecksum);
+
+        entries.get(0).release();
+        producer.close();
+    }
+
+    @Test
+    public void verifyChecksumSentToConsumer() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abc/topic-1";
+
+        Producer producer = pulsarClient.createProducer(topicName);
+        RawReader reader = RawReader.create(pulsarClient, topicName, "sub").get();
+
+        producer.send("Hello".getBytes());
+
+        RawMessage msg = reader.readNextAsync().get();
+
+        ByteBuf b = msg.getHeadersAndPayload();
+        assertTrue(Commands.hasChecksum(b));
+        int parsedChecksum = Commands.readChecksum(b).intValue();
+        int computedChecksum = Crc32cChecksum.computeChecksum(b);
+        assertEquals(parsedChecksum, computedChecksum);
+
+        producer.close();
+        reader.closeAsync().get();
+    }
+
+}
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 08d5572..fc773fe 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -471,7 +471,8 @@ void ClientConnection::processIncomingBuffer() {
             MessageMetadata msgMetadata;
 
             // read checksum
-            bool isChecksumValid = verifyChecksum(incomingBuffer_, incomingCmd_);
+            uint32_t remainingBytes = frameSize - (cmdSize + 4);
+            bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd_);
 
             uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
             if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
@@ -485,8 +486,9 @@ void ClientConnection::processIncomingBuffer() {
             }
 
             incomingBuffer_.consume(metadataSize);
+            remainingBytes -= (4 + metadataSize);
 
-            uint32_t payloadSize = frameSize - (cmdSize + 4) - (metadataSize + 4);
+            uint32_t payloadSize = remainingBytes;
             SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
             incomingBuffer_.consume(payloadSize);
             handleIncomingMessage(incomingCmd_.message(), isChecksumValid, msgMetadata, payload);
@@ -518,13 +520,17 @@ void ClientConnection::processIncomingBuffer() {
     readNextCommand();
 }
 
-bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, proto::BaseCommand& incomingCmd_) {
+bool ClientConnection::verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
+                                      proto::BaseCommand& incomingCmd_) {
     int readerIndex = incomingBuffer_.readerIndex();
     bool isChecksumValid = true;
+
     if (incomingBuffer_.readUnsignedShort() == Commands::magicCrc32c) {
         uint32_t storedChecksum = incomingBuffer_.readUnsignedInt();
+        remainingBytes -= (2 + 4) /* subtract size of checksum itself */;
+
         // compute metadata-payload checksum
-        int metadataPayloadSize = incomingBuffer_.readableBytes();
+        int metadataPayloadSize = remainingBytes;
         uint32_t computedChecksum = computeChecksum(0, incomingBuffer_.data(), metadataPayloadSize);
         // verify checksum
         isChecksumValid = (storedChecksum == computedChecksum);
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 6b0d884..47f4994 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -166,7 +166,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
     void handleRead(const boost::system::error_code& err, size_t bytesTransferred, uint32_t minReadSize);
 
     void processIncomingBuffer();
-    bool verifyChecksum(SharedBuffer& incomingBuffer_, proto::BaseCommand& incomingCmd_);
+    bool verifyChecksum(SharedBuffer& incomingBuffer_, uint32_t& remainingBytes,
+                        proto::BaseCommand& incomingCmd_);
 
     void handleIncomingCommand();
     void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 8b1ee0e..2c13125 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -198,6 +198,7 @@ public final class PulsarApi {
     v8(8, 8),
     v9(9, 9),
     v10(10, 10),
+    v11(11, 11),
     ;
     
     public static final int v0_VALUE = 0;
@@ -211,6 +212,7 @@ public final class PulsarApi {
     public static final int v8_VALUE = 8;
     public static final int v9_VALUE = 9;
     public static final int v10_VALUE = 10;
+    public static final int v11_VALUE = 11;
     
     
     public final int getNumber() { return value; }
@@ -228,6 +230,7 @@ public final class PulsarApi {
         case 8: return v8;
         case 9: return v9;
         case 10: return v10;
+        case 11: return v11;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index db2eec4..964fd52 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -134,6 +134,7 @@ enum ProtocolVersion {
 	v8 = 8;  // Added CommandConsumerStats - Client fetches broker side consumer stats
 	v9 = 9;  // Added end of topic notification
 	v10 = 10;// Added proxy to broker
+	v11 = 11;// C++ consumers before this version are not correctly handling the checksum field
 }
 
 message CommandConnect {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.