You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 16:46:05 UTC

[pulsar] branch master updated: Add test for the key_shared consumer enabled cryptoKeyReader (#9373)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 252a79a  Add test for the key_shared consumer enabled cryptoKeyReader (#9373)
252a79a is described below

commit 252a79a0afc8d4bff954d563d139537e7956cc8d
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jan 31 00:45:35 2021 +0800

    Add test for the key_shared consumer enabled cryptoKeyReader (#9373)
---
 .../client/api/KeySharedSubscriptionTest.java      | 114 ++++++++++++++++++++-
 1 file changed, 110 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 938ffed..b5e032f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -25,6 +25,9 @@ import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -61,10 +64,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
 
     @DataProvider(name = "batch")
-    public Object[][] batchProvider() {
-        return new Object[][] {
-                { false },
-                { true }
+    public Object[] batchProvider() {
+        return new Object[] {
+                false,
+                true
         };
     }
 
@@ -856,6 +859,72 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         Assert.assertNotNull(consumer3.receive(1, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testKeySharedConsumerWithEncrypted() throws Exception {
+        final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+        final int totalMessages = 100;
+
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .cryptoKeyReader(new EncKeyReader())
+                .subscribe();
+
+        List<Consumer<Integer>> consumers = Lists.newArrayList(consumer1, consumer2);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .cryptoKeyReader(new EncKeyReader())
+                .create();
+
+        for (int i = 0; i < totalMessages; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+                    .value(i)
+                    .send();
+        }
+
+        List<Message<Integer>> receives = new ArrayList<>(totalMessages);
+        int[] consumerReceivesCount = new int[] {0, 0};
+
+        for (int i = 0; i < consumers.size(); i++) {
+            while (true) {
+                Message<Integer> received = consumers.get(i).receive(3, TimeUnit.SECONDS);
+                if (received != null) {
+                    receives.add(received);
+                    int current = consumerReceivesCount[i];
+                    consumerReceivesCount[i] = current + 1;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        Assert.assertEquals(receives.size(), totalMessages);
+        Assert.assertEquals(consumerReceivesCount[0] + consumerReceivesCount[1], totalMessages);
+        Assert.assertTrue(consumerReceivesCount[0] > 0);
+        Assert.assertTrue(consumerReceivesCount[1] > 0);
+
+        Map<String, Integer> maxValueOfKey = new HashMap<>();
+        receives.forEach(msg -> {
+            if (maxValueOfKey.containsKey(msg.getKey())) {
+                Assert.assertTrue(msg.getValue() > maxValueOfKey.get(msg.getKey()));
+            }
+            maxValueOfKey.put(msg.getKey(), msg.getValue());
+        });
+    }
+
     private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
         return pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -1037,4 +1106,41 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
                 "Key "+ key +  "is distributed to multiple consumers." );
         }));
     }
+
+    private static class EncKeyReader implements CryptoKeyReader {
+
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+        @Override
+        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+            String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+            if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                try {
+                    keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                }
+            } else {
+                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+            }
+            return null;
+        }
+
+        @Override
+        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+            String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+            if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                try {
+                    keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                }
+            } else {
+                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+            }
+            return null;
+        }
+    }
 }