You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/15 12:22:05 UTC

[GitHub] [pulsar] RobertIndie opened a new pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

RobertIndie opened a new pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962


   ### Motivation
   Currently, there are no encryption integration shade test cases in pulsar-client and pulsar-client-admin
   #8850 add these test cases for pulsar-client-all
   
   ### Modification
   * Add encryption integration test cases for pulsar-client and pulsar-client-admin.
   * Include the shading of pulsar-client-api.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#issuecomment-747950685


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#discussion_r545998010



##########
File path: pulsar-client-shaded/pom.xml
##########
@@ -104,6 +104,7 @@
               <artifactSet>
                 <includes>
                   <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.pulsar:pulsar-client-api</include>

Review comment:
       The client api is meant to not get included in the shaded artifact, otherwise IDEs will not be able to fetch the sources and javadoc for the client APIs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on a change in pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#discussion_r546561364



##########
File path: pulsar-client-shaded/pom.xml
##########
@@ -104,6 +104,7 @@
               <artifactSet>
                 <includes>
                   <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.pulsar:pulsar-client-api</include>

Review comment:
       But if we don't include pulsar-client-api, some interfaces may not be relocated correctly.
   For example, in SimpleProducerConsumerTest:558:
   ```java
   MessageCrypto crypto = new MessageCryptoBc("test", false);
   PulsarApi.MessageMetadata.Builder metadataBuilder = PulsarApi.MessageMetadata.newBuilder();
   PulsarApi.EncryptionKeys.Builder encKeyBuilder = PulsarApi.EncryptionKeys.newBuilder();
   encKeyBuilder.setKey(encryptionKeyName);
   ByteString keyValue = ByteString.copyFrom(dataKey);
   encKeyBuilder.setValue(keyValue);
   PulsarApi.EncryptionKeys encKey = encKeyBuilder.build();
   metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
   metadataBuilder.setEncryptionAlgo(encAlgo);
   metadataBuilder.setProducerName("test");
   metadataBuilder.setSequenceId(123);
   metadataBuilder.setPublishTime(12333453454L);
   metadataBuilder.addEncryptionKeys(encKey);
   metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType));
   metadataBuilder.setUncompressedSize(uncompressedSize);
   ByteBuf decryptedPayload = crypto.decrypt(() -> metadataBuilder.build(), payloadBuf, reader); 
   ```
   It will throw an error in the last line:
   `'decrypt(java.util.function.Supplier, io.netty.buffer.ByteBuf, org.apache.pulsar.client.api.CryptoKeyReader)' in 'org.apache.pulsar.client.api.MessageCrypto' cannot be applied to '(<lambda expression>, org.apache.pulsar.shade.io.netty.buffer.ByteBuf, org.apache.pulsar.client.api.CryptoKeyReader)'`
   
   So we need to include pulsar-client-api to make
   ```
   io.netty.buffer.ByteBuf decrypt(java.util.function.Supplier<MetadataT> supplier, io.netty.buffer.ByteBuf byteBuf, org.apache.pulsar.client.api.CryptoKeyReader cryptoKeyReader);
   ```
   change to :
   ```
   org.apache.pulsar.shade.io.netty.buffer.ByteBuf decrypt(java.util.function.Supplier<MetadataT> supplier, org.apache.pulsar.shade.io.netty.buffer.ByteBuf byteBuf, org.apache.pulsar.client.api.CryptoKeyReader cryptoKeyReader);
   ```
   
   Currently, I haven't found another solution to solve this problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jbampton commented on a change in pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
jbampton commented on a change in pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#discussion_r543371155



##########
File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();

Review comment:
       ```suggestion
           String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
   ```

##########
File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order

Review comment:
       ```suggestion
           Set<String> messages = new HashSet();// Since messages are in random order
   ```

##########
File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order

Review comment:
       ```suggestion
           Set<String> messages = new HashSet();// Since messages are in random order
   ```

##########
File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();

Review comment:
       ```suggestion
           String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
   ```

##########
File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+
+        // Consuming from consumer 2 and 3 again just to be sure
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+        Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process the message again.
+         * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+        Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+
+        // Consuming from consumer 2 and 3 again just to be sure
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on a change in pull request #8962: [ShadeTest]Add integration shade test for pulsar-client and pulsar-cient-admin.

Posted by GitBox <gi...@apache.org>.
jiazhai commented on a change in pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#discussion_r548534150



##########
File path: pulsar-client-shaded/pom.xml
##########
@@ -104,6 +104,7 @@
               <artifactSet>
                 <includes>
                   <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.pulsar:pulsar-client-api</include>

Review comment:
       seems the latest intellij idea has fixed the shade issue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org