You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2022/07/21 00:49:39 UTC

[pulsar] branch master updated: Support encryption in websocket proxy (#16234)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52da03f08f4 Support encryption in websocket proxy (#16234)
52da03f08f4 is described below

commit 52da03f08f44aa841ba5510ecb83b8834ba8964c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jul 20 17:49:32 2022 -0700

    Support encryption in websocket proxy (#16234)
    
    fix doc
---
 .../configuration/PulsarConfigurationLoader.java   |  16 +-
 .../proxy/ProxyEncryptionPublishConsumeTest.java   | 249 +++++++++++++++++++++
 .../apache/pulsar/websocket/ConsumerHandler.java   |   3 +
 .../pulsar/websocket/CryptoKeyReaderFactory.java   |  29 +++
 .../apache/pulsar/websocket/ProducerHandler.java   |  10 +
 .../org/apache/pulsar/websocket/ReaderHandler.java |   3 +
 .../apache/pulsar/websocket/WebSocketService.java  |  19 ++
 .../service/WebSocketProxyConfiguration.java       |   5 +
 site2/docs/client-libraries-websocket.md           |   7 +
 9 files changed, 338 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 12bba1a7fa2..b0479d81588 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -182,23 +182,33 @@ public class PulsarConfigurationLoader {
             final ServiceConfiguration convertedConf = ServiceConfiguration.class
                     .getDeclaredConstructor().newInstance();
             Field[] confFields = conf.getClass().getDeclaredFields();
+            Properties properties = new Properties();
             Arrays.stream(confFields).forEach(confField -> {
                 try {
-                    Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
                     confField.setAccessible(true);
+                    Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
                     if (!Modifier.isStatic(convertedConfField.getModifiers())) {
                         convertedConfField.setAccessible(true);
                         convertedConfField.set(convertedConf, confField.get(conf));
                     }
                 } catch (NoSuchFieldException e) {
                     if (!ignoreNonExistMember) {
-                        throw new IllegalArgumentException("Exception caused while converting configuration: "
-                                + e.getMessage());
+                        throw new IllegalArgumentException(
+                                "Exception caused while converting configuration: " + e.getMessage());
+                    }
+                    // add unknown fields to properties
+                    try {
+                        if (confField.get(conf) != null) {
+                            properties.put(confField.getName(), confField.get(conf));
+                        }
+                    } catch (Exception ignoreException) {
+                        // should not happen
                     }
                 } catch (IllegalAccessException e) {
                     throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage());
                 }
             });
+            convertedConf.getProperties().putAll(properties);
             return convertedConf;
         } catch (InstantiationException | IllegalAccessException
                 | InvocationTargetException | NoSuchMethodException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
new file mode 100644
index 00000000000..bbd2b3bd14f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.CryptoKeyReaderFactory;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+@Test(groups = "websocket")
+public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
+    protected String methodName;
+
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
+        WebSocketService service = spy(new WebSocketService(config));
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.resetConfig();
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @Test(timeOut = 10000)
+    public void socketTest() throws Exception {
+        final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
+        String readerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
+        String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+                + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1?encryptionKeys=client-ecdsa.pem";
+
+        URI consumeUri = URI.create(consumerUri);
+        URI readUri = URI.create(readerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+        WebSocketClient readClient = new WebSocketClient();
+        SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            consumeClient1.start();
+            consumeClient2.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+            Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+            log.info("Connecting to : {}", consumeUri);
+
+            readClient.start();
+            ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
+            Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
+            log.info("Connecting to : {}", readUri);
+
+            // let it connect
+            assertTrue(consumerFuture1.get().isOpen());
+            assertTrue(consumerFuture2.get().isOpen());
+            assertTrue(readerFuture.get().isOpen());
+
+            // Also make sure subscriptions and reader are already created
+            Thread.sleep(500);
+
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            produceClient.start();
+            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+            assertTrue(producerFuture.get().isOpen());
+
+            int retry = 0;
+            int maxRetry = 400;
+            while ((consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10)
+                    || readSocket.getReceivedMessagesCount() < 10) {
+                Thread.sleep(10);
+                if (retry++ > maxRetry) {
+                    final String msg = String.format("Consumer still has not received the message after %s ms",
+                            (maxRetry * 10));
+                    log.warn(msg);
+                    throw new IllegalStateException(msg);
+                }
+            }
+
+            // if the subscription type is exclusive (default), either of the
+            // consumer
+            // sessions has already been closed
+            assertTrue(consumerFuture1.get().isOpen());
+            assertTrue(consumerFuture2.get().isOpen());
+            assertTrue(produceSocket.getBuffer().size() > 0);
+
+            if (consumeSocket1.getBuffer().size() > consumeSocket2.getBuffer().size()) {
+                assertEquals(produceSocket.getBuffer(), consumeSocket1.getBuffer());
+            } else {
+                assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer());
+            }
+            assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
+        } finally {
+            stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
+        }
+    }
+
+    public static class CryptoKeyReaderFactoryImpl implements CryptoKeyReaderFactory {
+
+        private static final EncKeyReader reader = new EncKeyReader();
+
+        @Override
+        public CryptoKeyReader create() {
+            return reader;
+        }
+
+    }
+
+    public static class EncKeyReader implements CryptoKeyReader {
+
+        final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+        @Override
+        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+            String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+            if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                try {
+                    keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                }
+            } else {
+                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+            }
+            return null;
+        }
+
+        @Override
+        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+            String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+            if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                try {
+                    keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                }
+            } else {
+                Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+            }
+            return null;
+        }
+    }
+
+    private void stopWebSocketClient(WebSocketClient... clients) {
+        @Cleanup("shutdownNow")
+        ExecutorService executor = newFixedThreadPool(1);
+        try {
+            executor.submit(() -> {
+                for (WebSocketClient client : clients) {
+                    try {
+                        client.stop();
+                    } catch (Exception e) {
+                        log.error(e.getMessage());
+                    }
+                }
+                log.info("proxy clients are stopped successfully");
+            }).get(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("failed to close proxy clients", e);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ProxyEncryptionPublishConsumeTest.class);
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 78179a8dd7c..7ecc047bb4a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -454,6 +454,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
             }
         }
 
+        if (service.getCryptoKeyReader().isPresent()) {
+            builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+        }
         return builder;
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java
new file mode 100644
index 00000000000..cad806c8788
--- /dev/null
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+
+/**
+ * Factory class to create {@link CryptoKeyReader}.
+ *
+ */
+public interface CryptoKeyReaderFactory {
+    CryptoKeyReader create();
+}
\ No newline at end of file
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ca3e8ecbc3..760cb28b632 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory;
 
 public class ProducerHandler extends AbstractWebSocketHandler {
 
+    private WebSocketService service;
     private Producer<byte[]> producer;
     private final LongAdder numMsgsSent;
     private final LongAdder numMsgsFailed;
@@ -83,6 +84,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         this.numBytesSent = new LongAdder();
         this.numMsgsFailed = new LongAdder();
         this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
+        this.service = service;
 
         if (!checkAuth(response)) {
             return;
@@ -328,6 +330,14 @@ public class ProducerHandler extends AbstractWebSocketHandler {
             builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
         }
 
+        if (queryParams.containsKey("encryptionKeys")) {
+            builder.cryptoKeyReader(service.getCryptoKeyReader().orElseThrow(() -> new IllegalStateException(
+                    "Can't add encryption key without configuring cryptoKeyReaderFactoryClassName")));
+            String[] keys = queryParams.get("encryptionKeys").split(",");
+            for (String key : keys) {
+                builder.addEncryptionKey(key);
+            }
+        }
         return builder;
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 6510386aef9..3c1cd2a9137 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -102,6 +102,9 @@ public class ReaderHandler extends AbstractWebSocketHandler {
                     log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
                 }
             }
+            if (service.getCryptoKeyReader().isPresent()) {
+                builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+            }
 
             this.reader = builder.create();
             Consumer<?> consumer = getConsumer();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 710e367af31..2a4a5977f7e 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -23,17 +23,21 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.servlet.ServletException;
 import javax.websocket.DeploymentException;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SizeUnit;
@@ -64,6 +68,8 @@ public class WebSocketService implements Closeable {
     private PulsarResources pulsarResources;
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
+    @Getter
+    private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
 
     private ClusterData localCluster;
     private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
@@ -118,6 +124,19 @@ public class WebSocketService implements Closeable {
         }
         // start authentication service
         authenticationService = new AuthenticationService(this.config);
+        // initialize crypto key reader
+        String cryptoFactoryClassName = (String) config.getProperties().get("cryptoKeyReaderFactoryClassName");
+        if (StringUtils.isNotBlank(cryptoFactoryClassName)) {
+            try {
+                CryptoKeyReaderFactory factoryInstance = (CryptoKeyReaderFactory) Class.forName(cryptoFactoryClassName)
+                        .getDeclaredConstructor().newInstance();
+                cryptoKeyReader = Optional.ofNullable(factoryInstance.create());
+            } catch (Exception e) {
+                log.info("Failed to initialize crypto-key reader", e);
+                throw new PulsarServerException(e);
+            }
+        }
+
         log.info("Pulsar WebSocket Service started");
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 1c061b9da11..8fb3ab7ed01 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -245,6 +245,11 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
     )
     private Set<String> webServiceTlsCiphers = new TreeSet<>();
 
+    @FieldContext(
+            doc = "CryptoKeyReader factory classname to support encryption at websocket."
+    )
+    private String cryptoKeyReaderFactoryClassName;
+
     @FieldContext(doc = "Key-value properties. Types are all String")
     private Properties properties = new Properties();
 
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 6c84ba3d8ac..c5455b05388 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -60,6 +60,12 @@ tlsTrustCertsFilePath=/path/to/ca.cert.pem
 
 ```
 
+To enable encryption at rest on WebSocket service, add CryptoKeyReaderFactory factory class in classpath which will create CryptoKeyReader for WebSocket and that helps to load encryption keys for producer/consumer.
+
+```
+cryptoKeyReaderFactoryClassName=org.apache.pulsar.MyCryptoKeyReaderFactoryClassImpl
+```
+
 ### Starting the broker
 
 When the configuration is set, you can start the service using the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) tool:
@@ -113,6 +119,7 @@ Key | Type | Required? | Explanation
 `initialSequenceId` | long | no | Set the baseline for the sequence ids for messages published by the producer.
 `hashingScheme` | string | no | [Hashing function](/api/client/org/apache/pulsar/client/api/ProducerConfiguration.HashingScheme.html) to use when publishing on a partitioned topic: `JavaStringHash`, `Murmur3_32Hash`
 `token` | string | no | Authentication token, this is used for the browser javascript client
+`encryptionKeys` | string | no | Encryption key to encrypt published message only if encryption reader is configured using cryptoKeyReaderFactoryClassName config in websocket-configuration.
 
 
 #### Publishing a message