You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/14 17:56:31 UTC

[pulsar] branch master updated: [Java shade client] Add encryption integration test (#8850)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08c0030  [Java shade client] Add encryption integration test (#8850)
08c0030 is described below

commit 08c0030ad57c763812230352c3b36feb322fe3f2
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Tue Dec 15 01:56:10 2020 +0800

    [Java shade client] Add encryption integration test (#8850)
    
    ### Motivation
    Currently, there are no encryption integration test cases in the shaded client test.
    #6834 needs more test cases to address the issue.
    
    ### Modification
    Add encryption integration test cases to the shaded client test.
---
 pulsar-client-all/pom.xml                          |  13 +
 tests/pulsar-client-all-shade-test/pom.xml         |  13 +
 .../integration/SimpleProducerConsumerTest.java    | 574 +++++++++++++++++++++
 .../src/test/resources/certificate/client.crt      |  20 +
 .../src/test/resources/certificate/client.csr      |  17 +
 .../src/test/resources/certificate/client.key      |  28 +
 .../certificate/private-key.client-ecdsa.pem       |  13 +
 .../private-key.client-mismatch-rsa.pem            |  29 ++
 .../certificate/private-key.client-rsa.pem         |  27 +
 .../certificate/public-key.client-ecdsa.pem        |   7 +
 .../certificate/public-key.client-mismatch-rsa.pem |   9 +
 .../certificate/public-key.client-rsa.pem          |   9 +
 .../src/test/resources/certificate/server.crt      |  20 +
 .../src/test/resources/certificate/server.csr      |  17 +
 .../src/test/resources/certificate/server.key      |  28 +
 .../src/test/resources/pulsar.xml                  |   1 +
 16 files changed, 825 insertions(+)

diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 12d1965..3694574 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -35,6 +35,11 @@
   <dependencies>
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-api</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -76,6 +81,14 @@
                   <includes>**/ProtobufSchema.class</includes>
                   <outputDirectory>${project.build.directory}/classes</outputDirectory>
                 </artifactItem>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>pulsar-client-api</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+                  <overWrite>true</overWrite>
+                  <outputDirectory>${project.build.directory}/classes</outputDirectory>
+                </artifactItem>
               </artifactItems>
             </configuration>
           </execution>
diff --git a/tests/pulsar-client-all-shade-test/pom.xml b/tests/pulsar-client-all-shade-test/pom.xml
index 8ea60d0..1419a66 100644
--- a/tests/pulsar-client-all-shade-test/pom.xml
+++ b/tests/pulsar-client-all-shade-test/pom.xml
@@ -48,6 +48,19 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>bouncy-castle-bc</artifactId>
+            <version>${project.version}</version>
+            <classifier>pkg</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-messagecrypto-bc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
new file mode 100644
index 0000000..7245385
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+
+        // Consuming from consumer 2 and 3 again just to be sure
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {
+            assertTrue(messages.contains((message + i)));
+        }
+
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        newPulsarClient.close();
+        newPulsarClient1.close();
+        newPulsarClient2.close();
+    }
+
+    @Test
+    public void testEncryptionFailure() throws Exception {
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+                    }
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+                    }
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        MessageImpl<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        // 1. Invalid key name
+        try {
+            pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+                    .addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+            Assert.fail("Producer creation should not suceed if failing to read key");
+        } catch (Exception e) {
+            // ok
+        }
+
+        // 2. Producer with valid key name
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(new EncKeyReader())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // 3. KeyReder is not set by consumer
+        // Receive should fail since key reader is not setup
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(msg, "Receive should have failed with no keyreader");
+
+        // 4. Set consumer config to consume even if decryption fails
+        consumer.close();
+        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        int msgNum = 0;
+        try {
+            // Receive should proceed and deliver encrypted message
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            String expectedMessage = "my-message-" + msgNum++;
+            Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
+                    + " should not match the expected message " + expectedMessage);
+            consumer.acknowledgeCumulative(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("Failed to receive message even after ConsumerCryptoFailureAction.CONSUME is set.");
+        }
+
+        // 5. Set keyreader and failure action
+        consumer.close();
+        // Set keyreader
+        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+                .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        for (int i = msgNum; i < totalMsg - 1; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+
+        // 6. Set consumer config to discard if decryption fails
+        consumer.close();
+        consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        // Receive should proceed and discard encrypted messages
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
+    }
+    @Test(groups = "encryption")
+    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic3")
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic3")
+                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
+
+        String message = "my-message";
+        producer.send(message.getBytes());
+
+        TopicMessageImpl<byte[]> msg = (TopicMessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+
+        String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
+        assertEquals(message, receivedMessage);
+
+        consumer.close();
+    }
+
+    private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader)
+            throws Exception {
+        Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
+        Assert.assertTrue(ctx.isPresent());
+        EncryptionContext encryptionCtx = ctx
+                .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+
+        Map<String, EncryptionContext.EncryptionKey> keys = encryptionCtx.getKeys();
+        assertEquals(keys.size(), 1);
+        EncryptionContext.EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+        byte[] dataKey = encryptionKey.getKeyValue();
+        Map<String, String> metadata = encryptionKey.getMetadata();
+        String version = metadata.get("version");
+        assertEquals(version, "1.0");
+
+        CompressionType compressionType = encryptionCtx.getCompressionType();
+        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+        byte[] encrParam = encryptionCtx.getParam();
+        String encAlgo = encryptionCtx.getAlgorithm();
+        int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+        ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+        // try to decrypt use default MessageCryptoBc
+        MessageCrypto crypto = new MessageCryptoBc("test", false);
+        PulsarApi.MessageMetadata.Builder metadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+        org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.Builder encKeyBuilder = PulsarApi.EncryptionKeys.newBuilder();
+        encKeyBuilder.setKey(encryptionKeyName);
+        ByteString keyValue = ByteString.copyFrom(dataKey);
+        encKeyBuilder.setValue(keyValue);
+        PulsarApi.EncryptionKeys encKey = encKeyBuilder.build();
+        metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+        metadataBuilder.setEncryptionAlgo(encAlgo);
+        metadataBuilder.setProducerName("test");
+        metadataBuilder.setSequenceId(123);
+        metadataBuilder.setPublishTime(12333453454L);
+        metadataBuilder.addEncryptionKeys(encKey);
+        metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType));
+        metadataBuilder.setUncompressedSize(uncompressedSize);
+        ByteBuf decryptedPayload = crypto.decrypt(() -> metadataBuilder.build(), payloadBuf, reader);
+
+        // try to uncompress
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
+
+        if (batchSize > 0) {
+            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+                    .newBuilder();
+            uncompressedPayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+                    singleMessageMetadataBuilder, 0, batchSize);
+        }
+
+        byte[] data = new byte[uncompressedPayload.readableBytes()];
+        uncompressedPayload.readBytes(data);
+        uncompressedPayload.release();
+        return new String(data);
+    }
+
+}
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.crt b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.crt
new file mode 100644
index 0000000..2d7d156
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDVjCCAj4CCQCtw/UnTFDT7DANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJB
+VTETMBEGA1UECAwKU29tZS1TdGF0ZTEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBmNsaWVu
+dDAeFw0xNjA2MjAwMTQ1NDZaFw0yNjA2MTgwMTQ1NDZaMG0xCzAJBgNVBAYTAkFV
+MRMwEQYDVQQIDApTb21lLVN0YXRlMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxITAf
+BgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqQV5F3Au9FWXIYPdWqiX
+Rk5gdVmVkDuuFK4ZoOd8inoJpB3PPkpmpgoVkKQHDFhgx3ODGWIUgo+n6QDsJxY4
+ygHfVeggQgek8iUfteYVsIcHS0bjkhIij/3ihC301FkiqbrV069oLvUXLKcv3zxG
+mdBAiz0k4xGZhFieVRvQCLY9syUUxmQ/3Cv42lDY8a1gTw4CRRx/hCfDvXCKhOT4
+bMwUIDZfHB3JoDh3Thp8FLz0nTrRF75mSQJ/OdcafIm0Xoz2Otp/CSxLS+U1lLvG
+05crWTDe0om7NW4mK4CqGCFq5gUw7eIzaeO7Q5Qez9XGTMzkgIDTMvNYGGEeJhhm
+NQIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAKXy4g6hljY5MpO8mbZh+uJHq6NEUs
+4dr7OKDDWc39AROZsGf2eFUmHOjmRSw7VHpguGKI+rFRELVffpg/VvMh5apu+DBf
+jhxtDNceAyh5uugPNUJHXyeikBDYW8bAzUU3DmMldPkTZWcGjurmyhDQ1TtK2YJe
+RMFBXw5aAzdJMNi6OfXDH/ZX32hrb482yghDZj+ndnm0FefmLbFTQRMF8/fIHb1W
+kqNHwIaapZwH6j/MJy/TRFYcJunrBUYT9zVjY46k3GU0ex/Bn7T4pg9gzgFGZJhn
+jQQFKliIC84thCzdlPkrLduLY8tmlDKpLXatbEQ+s1MmNOURm6irPp6g
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.csr b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.csr
new file mode 100644
index 0000000..e01f33e
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICsjCCAZoCAQAwbTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
+FTATBgNVBAcMDERlZmF1bHQgQ2l0eTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
+DwAwggEKAoIBAQCpBXkXcC70VZchg91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+
+SmamChWQpAcMWGDHc4MZYhSCj6fpAOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOS
+EiKP/eKELfTUWSKputXTr2gu9Rcspy/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTG
+ZD/cK/jaUNjxrWBPDgJFHH+EJ8O9cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEX
+vmZJAn851xp8ibRejPY62n8JLEtL5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt
+4jNp47tDlB7P1cZMzOSAgNMy81gYYR4mGGY1AgMBAAGgADANBgkqhkiG9w0BAQUF
+AAOCAQEAk3eueaq/gonBzKH75oWHlqPbMZQFk4NXqx8h24ZfkCzPEFPyDM+jdQxv
+8vDtyWq+fizqAQmGrM7WPHgnTbmZyovfmwuKwtTlkD/8t7XpTmm9fYspbL4WzdP1
+y8/Vug09te+rni+v+kjk5b9IceEy6kLvXuzirE6c4LunAm+thrr5gWmsx1pyDiq7
+W2M15UZrm/paaCg6cVaMFdXCRZP+g1P4NcgDUe2TyFbLlhOJNtX3DJRZWEhrkEYK
+mRz2tJuiuitCzheAgRrFXepRagHKYffNSas1n/2kIc9QpZ8654kxsAzEwL7CnHd/
+SHbMS9dfP+uM6DACwcvngSOBMJ9KMg==
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.key b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.key
new file mode 100644
index 0000000..34fc701
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/client.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCpBXkXcC70VZch
+g91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+SmamChWQpAcMWGDHc4MZYhSCj6fp
+AOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOSEiKP/eKELfTUWSKputXTr2gu9Rcs
+py/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTGZD/cK/jaUNjxrWBPDgJFHH+EJ8O9
+cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEXvmZJAn851xp8ibRejPY62n8JLEtL
+5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt4jNp47tDlB7P1cZMzOSAgNMy81gY
+YR4mGGY1AgMBAAECggEAcJj3yVhvv0/BhY8+CCYl2K1f7u1GCLbpSleNNTbhLbMM
+9yrwo/OWnGg9Y4USOPQrTNOz81X2id+/oSZ/K67PGCvVJ3qi+rny9WkrzdbAfkAF
+6O0Jr4arRbeBjkK7Rjc3M1EHH6VLx3R5AsNBzfpuogss5FVQXICd/5+1oscLeLEx
+/Fn+51IEn9FUg5vr7ElG51f+zPxexcWHLNoqGjTEIGGtI8/CfTzD9tBV4sIjf/Nc
+Zzfs9XYrChfcrS0U1zDa+L7c5gYfoN6M08sBiuZlhyyO9wgzPlp+XnsrSFv6hUta
+0scjAbN4bh+orQn6zgFN/sjkQnraWXW7pKFLyTR/IQKBgQDVju4IbhE9XRweNgXi
+s3BuGV+HsuFffEf0904/zCuCUcScGb5WCz5+KtlFJ//YxfocHVZajH+4GdCGbWim
+m+H3XvRpWgfK/aBNOXu5ueLbnPYyPjTrcpKRsomeoiV+Jz1tv5PQElwzCiCzVvQf
+fMyhQT16YIsFQAGJzQMBEHWODQKBgQDKnKps3sKSR3ycUtIxCVXUir7p52qst0Pm
+bPO8JrcRKZP2z8MJB96+DcQFzrxj7t5DDktkYEsFOPPuIeUsYXsY+MKHs4hEQVCz
+hpDJJNQ8s+SV8TLzKpinZEmLIjslLbn2rQrpqybPg84VxqX3qqM8IrXhMf77aGj6
+QHqvQwHWyQKBgQDF1RVO+9++j82ncvY6z22coKath5leIjxqgtqbISFBJUxUK0j2
+Xo4yxLDnbqmE/8m1V7wSP8tlGYzhquLiTM+kn/Mc0Ukc0503TMQABmJQfXRYkOXn
+IwkCLXltWdoPpnwyeeGNRCTjJ0OpvyiBLtRFobE498xxPZzvMdrRlpS/1QKBgQCo
+wmMleUnBQ2/kWQugMnFeLg6kjs+IesFAnYFKN0kGL4aB7j06OWbrEFY0rCS4bA6O
+9coQGjCCchSjRXI4TB2XCCQnmX8nsuuADNZt45Iv2XrM9XEFn3Y0/tBO5j0zU2nw
+r+NGC/uwns050BMPPf7mqNarctQ6HZZK0wgdEQfoGQKBgC+pbkQv9cn68TsiaJ3w
+tvNRTXCIAAH4Vtn9Cp+63ao+kXn94BJqQF99i58kJpG4ol6wbCHUoC6fHgxUh5HB
+JB0HjC2eCMgn4acAQg0sPW6l35KX36yYxtrL7eosB/yBYum0XAwmboNjEhlCZkOs
+YOpSsn61g7xqqrt40Spb5vUn
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
new file mode 100644
index 0000000..58ab3d4
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
@@ -0,0 +1,13 @@
+-----BEGIN EC PARAMETERS-----
+MIGXAgEBMBwGByqGSM49AQECEQD////9////////////////MDsEEP////3/////
+//////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2
+eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1
+ow0bkDihFQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIHYAgEBBBDeu9hc8kOvL3pl+LYSjLq9oIGaMIGXAgEBMBwGByqGSM49AQECEQD/
+///9////////////////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZ
+PCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bP
+Wsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1ow0bkDihFQIBAaEkAyIABOsqPpE8
+cY80pxkog5xw3i2AQ0yfV3MqMusxlOQnigBp
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000..3e2831a
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
@@ -0,0 +1,29 @@
+-----BEGIN EC PARAMETERS-----
+MIIBwgIBATBNBgcqhkjOPQEBAkIB////////////////////////////////////
+//////////////////////////////////////////////////8wgZ4EQgH/////
+////////////////////////////////////////////////////////////////
+/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh
+Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5
+MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte
+d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY
+9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC
+QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3
+CaXQO7XJuImcR667b7cekThkCQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIICnQIBAQRCAeNLEp1HefZ1nMl5vvgFMsJCd5ieCWqPT7TXbQkn27A8WkyAGTYC
+GtolyPokOgSjbJh+ofBt/MgvE/nMrqzmkZVtoIIBxjCCAcICAQEwTQYHKoZIzj0B
+AQJCAf//////////////////////////////////////////////////////////
+////////////////////////////MIGeBEIB////////////////////////////
+//////////////////////////////////////////////////////////wEQVGV
+PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1
+c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3
+BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB
+hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu
+cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB////////////////////
+///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4
+ZAkCAQGhgYkDgYYABAFhUHeaHfIWre/pPmv2a2l891co79dFpg6ixPRg+Y5qe0C7
+src//LT/ZR5rgj8ne+YcaIlwyQRl5OYEd25n799IcgHIBTGyaLB6Td5mW/oWT/Fz
+soufOnUJ7O/kDHjIQ15sczk3rDhe8/mB9zPjKlKTuAl5jBEt6E3yiB44Dtng02xD
+uQ==
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-rsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
new file mode 100644
index 0000000..a0d589e
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAtKWwgqdnTYrOCv+j1MkTWfSH0wCsHZZca9wAW3qP4uuhlBvn
+b10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhURHM4yuXqzCxzuAwkQSo39rzX8PGC
+7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9ER1e5O/uEPAotP933hHQ0J2hMEek
+HqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+cMd8ZbGScPqI9M38bmn3OLxFTn1vt
+hpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs7ppZnRbv8Hc/DFdOKVQIgam6CDdn
+NKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQGQIDAQABAoIBAAaJFAi2C7u3cNrf
+AstY9vVDLoLIvHFZlkBktjKZDYmVIsRb+hSCViwVUrWLL67R6+Iv4eg4DeTOAx00
+8pncXKgZTw2wIb1/QjR/Y/RjlaC8lkdmRWli7udMQCZVsyhuSjW6Pj7vr8YE4woj
+FhNijxEGcf9wWrmMJrzdnTWQiXByo+eTvUQ9BPgPGrRjsMZmTkLyAVJff2DfxO5b
+IWFDYDJcyYAMCIMQu7vys/I50ou6ilb1CO6QM6Z7KpPeOoVFPwtzbh8cf9xM8UNS
+j6J/JmdWhgI34GS3NA68xTQ6PV7zjnhCc+iccm3JKyzGXwaApAZ+Eoce/9j4WKmu
+5B4ziR0CgYEA3l/9OHbl1zmyV+rRxWOIj/i2rTvHzwBnbnPJyuemL5VMFdpGodQ3
+vwHvyQmcECRVRxmXojQ4QuPPHs3qp6wEEFPCWxChLSTxlUc85SOFHWU2O99jV7zI
+7+JOpDK/Mstsx9nHgXduJF+glTFtA3LH8Oqylzu2aFPsprwKuZf94Q8CgYEAz/Zx
+akEG+PEMtP5YS28cX5XfjsIX/V26Fs6/sH16QjUIEddE5T4fCuokxCjSiwUcWhml
+pHEJ5S5xp3VYRfISW3jRW3qstIH1tpZipB6+S0zTuJmLJbA3IiWEg2rtMt7X1uJv
+A/bYOqe0hOPTuXuZdtVZ0nMTKk7GG8O6VkBI7FcCgYEAkDfCmscJgs7JahlBWHmX
+zH9pwem+SPKjIc/4NB6N+dgikx2Pp05hpP/VihUwYIufvs/LNogVYNQrtHepUnrN
+2+TmbHbZgNSv1Ldxt82UfB7y0FutKu6lhmXHyNecho3Fi8sih0V0aiSWmYuHfrAH
+GaiskEZKo1iiZvQXJIx9O2MCgYATBf0r9hTYMtyxtc6H3/sdd01C9thQ8gDy0yjP
+0Tqc0dMSJroDqmIWkoKYew9/bhFA4LW5TCnWkCAPbHmNtG4fdfbYwmkH/hdnA2y0
+jKdlpfp8GXeUFAGHGx17FA3sqFvgKUh0eWEgRHUL7vdQMVFBgJS93o7zQM94fLgP
+6cOB8wKBgFcGV4GjI2Ww9cillaC554MvoSjf8B/+04kXzDOh8iYIIzO9EUil1jjK
+Jvxp4hnLzTKWbux3MEWqurLkYas6GpKBjw+iNOCar6YdqWGVqM3RUx7PTUaZwkKx
+UdP63IfY7iZCIT/QbyHQvIUe2MaiVnH+ulxdkK6Y5e7gxcbckIH4
+-----END RSA PRIVATE KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
new file mode 100644
index 0000000..5aeb429
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
@@ -0,0 +1,7 @@
+-----BEGIN PUBLIC KEY-----
+MIHKMIGjBgcqhkjOPQIBMIGXAgEBMBwGByqGSM49AQECEQD////9////////////
+////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1N
+aW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd
+7XqDAhEA/////gAAAAB1ow0bkDihFQIBAQMiAATrKj6RPHGPNKcZKIOccN4tgENM
+n1dzKjLrMZTkJ4oAaQ==
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000..6fc427b
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-rsa.pem b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
new file mode 100644
index 0000000..6fc427b
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.crt b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.crt
new file mode 100644
index 0000000..59b651b
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDLjCCAhYCCQDn/Yvym+FMsDANBgkqhkiG9w0BAQUFADBZMQswCQYDVQQGEwJB
+VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMTYwNjEzMjIyMTQ2WhcN
+MjYwNjExMjIyMTQ2WjBZMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0
+ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRIwEAYDVQQDEwls
+b2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCs29IuzZvk
+OGUkS/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du99
+8xbgeihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT
+1DepD/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQr
+mTbLzzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J
+92K+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNb
+O9hVS4bqqsw9AgMBAAEwDQYJKoZIhvcNAQEFBQADggEBACE0WBuTbHcPtYKv2ZMS
+mYk9jvtAhmWHQ6tNqV8CmS2AsrzZdWglGaqIRsm5slkD2BGeQS+BesTArUuENTmP
+r9kJSecdiiB8aWtLbhoCSH3QR6IW/b5UVl6sR5OIh7SkNTjMSUSDnMEVLNGyKZGS
+gCGVbDf3n5KhOTnwqguELRykynKFt2LVksBia9+88lUtiRHpbyClo/KVWltJlaww
+PT0WEpwqVUcHmwrR3MTzJDEPvIplSgxdaDmFGYS1YKm9T/wQd+t/0DbXMmfJXBbd
+FVUnB6o7qJVU9N2Tbaj9NbCtwz5nTZG4A5kRXWHVjZsn5WzLuS/me3rDXjwlfB2p
+ipY=
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.csr b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.csr
new file mode 100644
index 0000000..8782222
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICnjCCAYYCAQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUx
+ITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAxMJbG9j
+YWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArNvSLs2b5Dhl
+JEv8Kis3f4dnrKowko8OEiy23odRgN1BL4dZPfnkYmMCRtcgjh5nLKeAPA7vffMW
+4HooaVlsO6Yy+VVZMt3Irrx8s9QnOmykabPxs1SQcBT9IMx5yvG1DPOlOdshk9Q3
+qQ/2NpVEctIoHVXADIxsHjMKE5omlXMGXsX2LhHn/o8935qZjsD/a26CAPa0K5k2
+y885GwMGV3Zfm0byHnyIyGsyM0zrPLSVq4RWkyVIAYYS4xSME3aUvp/hajrvCfdi
+vj7yoyHid5Jp2ckRyu7onZQJ9IPwyvnKNbF98zkTwlnoNtSvv71R04/dcwETWzvY
+VUuG6qrMPQIDAQABoAAwDQYJKoZIhvcNAQEFBQADggEBAEPHySnpf3E/7tZsiDka
+rqdB/sU7fdqjyV0iy0cuKQkU8WYrsE7bHkqMYc8CiIDfWhIGW5Jnzups2O6eH0Sx
+2BS21ARFiNGC1UfY1HSV2zrTNh3RqQa3YsXzv9vvdQ/gjsqGDuGDIc1yAA+Ytdja
+3rhIzEVqBhiLzg+M2+gW1zs+Kqj0Zo0pLB2uqhdZJmjxBb2FCli50vCVEhqIS3RO
+KTE+AJfxThWIeahFyVaskaEGkS6NVr2JihV0elbKolH19k2UzRTVn7p3Ixh5ojuW
+gtU/90vOy/SDkSRmCWMqgkUKJ2oeImleHdrvwNyrzvrLWRAz6R5yGQJwji9kKpHD
+FK0=
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.key b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.key
new file mode 100644
index 0000000..6da70f5
--- /dev/null
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/certificate/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCs29IuzZvkOGUk
+S/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du998xbg
+eihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT1Dep
+D/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQrmTbL
+zzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J92K+
+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNbO9hV
+S4bqqsw9AgMBAAECggEAd/LuDeZFZ/+uR5qmuAhXMZqfWZSbsges5vW6S/6wkvB1
+vGp6heQzFAbKXKgJgjUcuULeXE6s58RYuppqEnin/1hcBOKxy/dUu9Q14H+2XPdo
+u6TPcvaaZ/xYjnr1hNtnHD6yB8zEpxVbLmjSHJxF7Dti9MA9TTfgCrC2LFYKsicD
+/5AQyHuwpHyTL3Iiwv4Qtks/SD2a3fu8lD0yTQwA/hY6/0ieXxXd9tZV5a6GSA0P
+nieol1byfuX7Q5fb8ggPd9u9K1mVZTBRKiE5w+uU4Ic2IkBmZX5ZuRS+vFplpLsY
+YpFPvzFmpNkpK2SdYjJ+V4tkJsFHmOaFRgW/0QB2DQKBgQDeQMSZBQlPUrgRdWHN
+OyvTcrSvXzg5DbaIj39tgdNZ6PYns/thD0n707KGRJOChIyYiiKxLxzLWdPUxqQO
+rNLUV9IkMVc/QZR8RUqGc2BxmPOxAprhzeOhLsyqP/sgtxRHAnLqmkXuHYoxvTZ6
+LFCRCZBpEJrutGxl3s/x+sfkuwKBgQDHGwnSmvArpL8ZY1dV4xKNkxifCBnNmqAl
+TKHPW3odN9nkMECEt1XUIioUUKXUsiAZNp5xa/v1DEyJ4f2T20QKcAGbS18b1M5W
+axIoH3IhyLo74tuo0fthgq5bzypfFOlIjo7F9mpEky/461RWmoNAAlp9+FkDi48C
+KwjAk39/ZwKBgQDXFJqs8sDFsOlMi+nvsHmDERhmNqG0JN8mXKgWk3KzKc09MuHs
+Vd1lBMNZSHfv8NIWtGdKTKty5yUmXm1ZfkoxECPevpkOMCq/8FZksrb8d+YswLae
+Gp9U1nNdtrkSOdo3tdj7y/wsqQ2ZgOB9bvEwyq6j3lvw8U2NcAiQxf44DQKBgBHb
+lPf0uZHQhutKA61KXoGgLdclrNrKAY8W3nRwqfUw6zQSN9cvcl1Cay/DQ/xdtY9N
+XMyjeMezwLGlOU8nnWSqQxqgmfkvDwqlM82xdFUfYcS5RiZQHxHR3L2TSSOaBoph
+buDGhyV7ZhQXV0slNJxrGZ6uxZ0RyVPSdEiBcjAFAoGBAJqZ6uCVHpv/FwZVggu7
+Xb9EIxZnLSmXwaXFpJoMZpRpKb8cSTTJbgSMv3Dq2LcNKYXdNBhgKgPSc/XipXt9
+ZdT36KWipV+PzW691kUiWHtA8/+E0LCi4Y7rlcBMz9PgDNXK4XMMZOVKxDqPcHSJ
+P6y01ku7T2X+abUiJ334Hg6G
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-all-shade-test/src/test/resources/pulsar.xml b/tests/pulsar-client-all-shade-test/src/test/resources/pulsar.xml
index 24f6e97..746cc3d 100644
--- a/tests/pulsar-client-all-shade-test/src/test/resources/pulsar.xml
+++ b/tests/pulsar-client-all-shade-test/src/test/resources/pulsar.xml
@@ -25,6 +25,7 @@
     <test name="pulsar-client-all-shade-suite" preserve-order="true" >
         <classes>
             <class name="org.apache.pulsar.tests.integration.SmokeTest" />
+            <class name="org.apache.pulsar.tests.integration.SimpleProducerConsumerTest" />
         </classes>
     </test>
 </suite>