You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 16:46:05 UTC
[pulsar] branch master updated: Add test for the key_shared
consumer enabled cryptoKeyReader (#9373)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 252a79a Add test for the key_shared consumer enabled cryptoKeyReader (#9373)
252a79a is described below
commit 252a79a0afc8d4bff954d563d139537e7956cc8d
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jan 31 00:45:35 2021 +0800
Add test for the key_shared consumer enabled cryptoKeyReader (#9373)
---
.../client/api/KeySharedSubscriptionTest.java | 114 ++++++++++++++++++++-
1 file changed, 110 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 938ffed..b5e032f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -25,6 +25,9 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -61,10 +64,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
@DataProvider(name = "batch")
- public Object[][] batchProvider() {
- return new Object[][] {
- { false },
- { true }
+ public Object[] batchProvider() {
+ return new Object[] {
+ false,
+ true
};
}
@@ -856,6 +859,72 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
}
+ @Test
+ public void testKeySharedConsumerWithEncrypted() throws Exception {
+ final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+ final int totalMessages = 100;
+
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .cryptoKeyReader(new EncKeyReader())
+ .subscribe();
+
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .cryptoKeyReader(new EncKeyReader())
+ .subscribe();
+
+ List<Consumer<Integer>> consumers = Lists.newArrayList(consumer1, consumer2);
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .cryptoKeyReader(new EncKeyReader())
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ producer.newMessage()
+ .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+ .value(i)
+ .send();
+ }
+
+ List<Message<Integer>> receives = new ArrayList<>(totalMessages);
+ int[] consumerReceivesCount = new int[] {0, 0};
+
+ for (int i = 0; i < consumers.size(); i++) {
+ while (true) {
+ Message<Integer> received = consumers.get(i).receive(3, TimeUnit.SECONDS);
+ if (received != null) {
+ receives.add(received);
+ int current = consumerReceivesCount[i];
+ consumerReceivesCount[i] = current + 1;
+ } else {
+ break;
+ }
+ }
+ }
+
+ Assert.assertEquals(receives.size(), totalMessages);
+ Assert.assertEquals(consumerReceivesCount[0] + consumerReceivesCount[1], totalMessages);
+ Assert.assertTrue(consumerReceivesCount[0] > 0);
+ Assert.assertTrue(consumerReceivesCount[1] > 0);
+
+ Map<String, Integer> maxValueOfKey = new HashMap<>();
+ receives.forEach(msg -> {
+ if (maxValueOfKey.containsKey(msg.getKey())) {
+ Assert.assertTrue(msg.getValue() > maxValueOfKey.get(msg.getKey()));
+ }
+ maxValueOfKey.put(msg.getKey(), msg.getValue());
+ });
+ }
+
private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -1037,4 +1106,41 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
"Key "+ key + "is distributed to multiple consumers." );
}));
}
+
+ private static class EncKeyReader implements CryptoKeyReader {
+
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
}