You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/07/07 20:54:46 UTC

[pulsar] branch master updated: Enable peeking encrypted batch messages (#11244)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d01ecb0  Enable peeking encrypted batch messages (#11244)
d01ecb0 is described below

commit d01ecb041847070eb1d011ebccadab5ee1d40ca3
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Jul 8 05:54:11 2021 +0900

    Enable peeking encrypted batch messages (#11244)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 43 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 11 +++++-
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d85ea53..a3c4c3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2519,6 +2519,7 @@ public class PersistentTopicsBase extends AdminResource {
             responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
             responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
         }
+        responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);
 
         // Decode if needed
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c91131..f16d2a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -2988,6 +2989,48 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
         admin.topics().createNonPartitionedTopic(topicName);
         assertThrows(() -> {admin.topics().truncate(topicName);});
+    }
+
+    @Test(timeOut = 20000)
+    public void testPeekEncryptedMessages() throws Exception {
+        final String topicName = "persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subName, MessageId.latest);
+
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(true)
+                .addEncryptionKey("my-app-key")
+                .defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+        producer.close();
+
+        final List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, subName, 5);
+        assertEquals(peekedMessages.size(), 5);
+
+        final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
 
+        final List<Message<byte[]>> receivedMessages = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            receivedMessages.add(msg);
+            consumer.acknowledge(msg);
+        }
+        consumer.unsubscribe();
+
+        for (int i = 0; i < 5; i++) {
+            assertEquals(peekedMessages.get(i).getMessageId(), receivedMessages.get(i).getMessageId());
+            assertEquals(peekedMessages.get(i).getData(), receivedMessages.get(i).getData());
+        }
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ae4de198..d122ded 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1535,8 +1535,17 @@ public class TopicsImpl extends BaseResource implements Topics {
             }
 
             tmp = headers.getFirst(BATCH_HEADER);
-            if (response.getHeaderString(BATCH_HEADER) != null) {
+            if (tmp != null) {
                 properties.put(BATCH_HEADER, (String) tmp);
+            }
+
+            boolean isEncrypted = false;
+            tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+            if (tmp != null) {
+                isEncrypted = Boolean.parseBoolean(tmp.toString());
+            }
+
+            if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) {
                 return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
             }