You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/07/07 20:54:46 UTC
[pulsar] branch master updated: Enable peeking encrypted batch
messages (#11244)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d01ecb0 Enable peeking encrypted batch messages (#11244)
d01ecb0 is described below
commit d01ecb041847070eb1d011ebccadab5ee1d40ca3
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Jul 8 05:54:11 2021 +0900
Enable peeking encrypted batch messages (#11244)
---
.../broker/admin/impl/PersistentTopicsBase.java | 1 +
.../apache/pulsar/broker/admin/AdminApiTest.java | 43 ++++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 11 +++++-
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d85ea53..a3c4c3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2519,6 +2519,7 @@ public class PersistentTopicsBase extends AdminResource {
responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
}
+ responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0);
// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c91131..f16d2a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -2988,6 +2989,48 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
+ }
+
+ @Test(timeOut = 20000)
+ public void testPeekEncryptedMessages() throws Exception {
+ final String topicName = "persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subName, MessageId.latest);
+
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(true)
+ .addEncryptionKey("my-app-key")
+ .defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+ .create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+ producer.close();
+
+ final List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, subName, 5);
+ assertEquals(peekedMessages.size(), 5);
+
+ final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+ final List<Message<byte[]>> receivedMessages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ receivedMessages.add(msg);
+ consumer.acknowledge(msg);
+ }
+ consumer.unsubscribe();
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(peekedMessages.get(i).getMessageId(), receivedMessages.get(i).getMessageId());
+ assertEquals(peekedMessages.get(i).getData(), receivedMessages.get(i).getData());
+ }
}
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ae4de198..d122ded 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1535,8 +1535,17 @@ public class TopicsImpl extends BaseResource implements Topics {
}
tmp = headers.getFirst(BATCH_HEADER);
- if (response.getHeaderString(BATCH_HEADER) != null) {
+ if (tmp != null) {
properties.put(BATCH_HEADER, (String) tmp);
+ }
+
+ boolean isEncrypted = false;
+ tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+ if (tmp != null) {
+ isEncrypted = Boolean.parseBoolean(tmp.toString());
+ }
+
+ if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) {
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
}