You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 13:13:53 UTC
[pulsar] 01/15: Websocket should pass the encryption context to the consumers (#12539)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2014457ae2b56eb9b231bfe2c6e8bee3e656cf53
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Oct 29 17:54:27 2021 -0700
Websocket should pass the encryption context to the consumers (#12539)
(cherry picked from commit d20efe5b0becd73ef33816c4bf33eafa2e28efa4)
---
.../websocket/proxy/ProxyPublishConsumeTest.java | 62 ++++++++++++++++++++++
.../websocket/proxy/SimpleConsumerSocket.java | 3 ++
.../apache/pulsar/websocket/ConsumerHandler.java | 1 +
.../pulsar/websocket/data/ConsumerMessage.java | 3 ++
4 files changed, 69 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 3e3cf9a..6386c51 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -29,12 +29,14 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -54,9 +56,11 @@ import javax.ws.rs.core.Response;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TopicType;
@@ -935,6 +939,64 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = 20000)
+ public void consumeEncryptedMessages() throws Exception {
+ final String subscription = "my-sub";
+ final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
+ final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
+ "/ws/v2/consumer/persistent/" + topic + "/" + subscription + "?cryptoFailureAction=CONSUME";
+ final int messages = 10;
+
+ WebSocketClient consumerClient = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+
+ final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu [...]
+ final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...]
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .defaultCryptoKeyReader(rsaPublicKeyData)
+ .addEncryptionKey("ws-consumer-a")
+ .create();
+
+ try {
+ consumerClient.start();
+ ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
+ Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
+
+ assertTrue(consumerFuture.get().isOpen());
+ assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+ for (int i = 0; i < messages; i++) {
+ producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+ }
+
+ producer.flush();
+ consumeSocket.sendPermits(messages);
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+
+ for (JsonObject msg : consumeSocket.messages) {
+ assertTrue(msg.has("encryptionContext"));
+ JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext");
+ JsonObject keys = encryptionCtx.getAsJsonObject("keys");
+ assertTrue(keys.has("ws-consumer-a"));
+
+ assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
+ }
+
+ // The message should not be acked since we only acked 1 message of the batch message
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+ .get(subscription).getMsgBacklog(), 0));
+
+ } finally {
+ stopWebSocketClient(consumerClient);
+ }
+ }
+
private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 749bfdcd..b1a9908 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -44,6 +44,7 @@ public class SimpleConsumerSocket {
private final CountDownLatch closeLatch;
private Session session;
private final ArrayList<String> consumerBuffer;
+ final ArrayList<JsonObject> messages;
private final AtomicInteger receivedMessages = new AtomicInteger();
// Custom message handler to override standard message processing, if it's needed
private SimpleConsumerMessageHandler customMessageHandler;
@@ -51,6 +52,7 @@ public class SimpleConsumerSocket {
public SimpleConsumerSocket() {
this.closeLatch = new CountDownLatch(1);
consumerBuffer = new ArrayList<>();
+ this.messages = new ArrayList<>();
}
public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException {
@@ -79,6 +81,7 @@ public class SimpleConsumerSocket {
public synchronized void onMessage(String msg) throws JsonParseException, IOException {
receivedMessages.incrementAndGet();
JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+ this.messages.add(message);
if (message.get(X_PULSAR_MESSAGE_ID) != null) {
String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
consumerBuffer.add(messageId);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 54a5a62..0192188 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -160,6 +160,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
dm.properties = msg.getProperties();
dm.publishTime = DateFormatter.format(msg.getPublishTime());
dm.redeliveryCount = msg.getRedeliveryCount();
+ dm.encryptionContext = msg.getEncryptionCtx().orElse(null);
if (msg.getEventTime() != 0) {
dm.eventTime = DateFormatter.format(msg.getEventTime());
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
index 9660c95..9091a7e 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.pulsar.common.api.EncryptionContext;
@JsonInclude(Include.NON_NULL)
public class ConsumerMessage {
@@ -32,5 +33,7 @@ public class ConsumerMessage {
public int redeliveryCount;
public String eventTime;
+ public EncryptionContext encryptionContext;
+
public String key;
}