You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2022/07/21 00:49:39 UTC
[pulsar] branch master updated: Support encryption in websocket proxy (#16234)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52da03f08f4 Support encryption in websocket proxy (#16234)
52da03f08f4 is described below
commit 52da03f08f44aa841ba5510ecb83b8834ba8964c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jul 20 17:49:32 2022 -0700
Support encryption in websocket proxy (#16234)
fix doc
---
.../configuration/PulsarConfigurationLoader.java | 16 +-
.../proxy/ProxyEncryptionPublishConsumeTest.java | 249 +++++++++++++++++++++
.../apache/pulsar/websocket/ConsumerHandler.java | 3 +
.../pulsar/websocket/CryptoKeyReaderFactory.java | 29 +++
.../apache/pulsar/websocket/ProducerHandler.java | 10 +
.../org/apache/pulsar/websocket/ReaderHandler.java | 3 +
.../apache/pulsar/websocket/WebSocketService.java | 19 ++
.../service/WebSocketProxyConfiguration.java | 5 +
site2/docs/client-libraries-websocket.md | 7 +
9 files changed, 338 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 12bba1a7fa2..b0479d81588 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -182,23 +182,33 @@ public class PulsarConfigurationLoader {
final ServiceConfiguration convertedConf = ServiceConfiguration.class
.getDeclaredConstructor().newInstance();
Field[] confFields = conf.getClass().getDeclaredFields();
+ Properties properties = new Properties();
Arrays.stream(confFields).forEach(confField -> {
try {
- Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
confField.setAccessible(true);
+ Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName());
if (!Modifier.isStatic(convertedConfField.getModifiers())) {
convertedConfField.setAccessible(true);
convertedConfField.set(convertedConf, confField.get(conf));
}
} catch (NoSuchFieldException e) {
if (!ignoreNonExistMember) {
- throw new IllegalArgumentException("Exception caused while converting configuration: "
- + e.getMessage());
+ throw new IllegalArgumentException(
+ "Exception caused while converting configuration: " + e.getMessage());
+ }
+ // add unknown fields to properties
+ try {
+ if (confField.get(conf) != null) {
+ properties.put(confField.getName(), confField.get(conf));
+ }
+ } catch (Exception ignoreException) {
+ // should not happen
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage());
}
});
+ convertedConf.getProperties().putAll(properties);
return convertedConf;
} catch (InstantiationException | IllegalAccessException
| InvocationTargetException | NoSuchMethodException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
new file mode 100644
index 00000000000..bbd2b3bd14f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.CryptoKeyReaderFactory;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+@Test(groups = "websocket")
+public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
+ protected String methodName;
+
+ private ProxyServer proxyServer;
+ private WebSocketService service;
+
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+ config.setWebServicePort(Optional.of(0));
+ config.setClusterName("test");
+ config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
+ WebSocketService service = spy(new WebSocketService(config));
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ proxyServer = new ProxyServer(config);
+ WebSocketServiceStarter.start(proxyServer, service);
+ log.info("Proxy Server Started");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.resetConfig();
+ super.internalCleanup();
+ if (service != null) {
+ service.close();
+ }
+ if (proxyServer != null) {
+ proxyServer.stop();
+ }
+ log.info("Finished Cleaning Up Test setup");
+ }
+
+ @Test(timeOut = 10000)
+ public void socketTest() throws Exception {
+ final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
+ String readerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/reader/persistent/my-property/my-ns/my-topic1";
+ String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ + "/ws/v2/producer/persistent/my-property/my-ns/my-topic1?encryptionKeys=client-ecdsa.pem";
+
+ URI consumeUri = URI.create(consumerUri);
+ URI readUri = URI.create(readerUri);
+ URI produceUri = URI.create(producerUri);
+
+ WebSocketClient consumeClient1 = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+ WebSocketClient consumeClient2 = new WebSocketClient();
+ SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+ WebSocketClient readClient = new WebSocketClient();
+ SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ consumeClient1.start();
+ consumeClient2.start();
+ ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+ ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
+ Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+ Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+ log.info("Connecting to : {}", consumeUri);
+
+ readClient.start();
+ ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
+ Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
+ log.info("Connecting to : {}", readUri);
+
+ // let it connect
+ assertTrue(consumerFuture1.get().isOpen());
+ assertTrue(consumerFuture2.get().isOpen());
+ assertTrue(readerFuture.get().isOpen());
+
+ // Also make sure subscriptions and reader are already created
+ Thread.sleep(500);
+
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ produceClient.start();
+ Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+ assertTrue(producerFuture.get().isOpen());
+
+ int retry = 0;
+ int maxRetry = 400;
+ while ((consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10)
+ || readSocket.getReceivedMessagesCount() < 10) {
+ Thread.sleep(10);
+ if (retry++ > maxRetry) {
+ final String msg = String.format("Consumer still has not received the message after %s ms",
+ (maxRetry * 10));
+ log.warn(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ // if the subscription type is exclusive (default), either of the
+ // consumer
+ // sessions has already been closed
+ assertTrue(consumerFuture1.get().isOpen());
+ assertTrue(consumerFuture2.get().isOpen());
+ assertTrue(produceSocket.getBuffer().size() > 0);
+
+ if (consumeSocket1.getBuffer().size() > consumeSocket2.getBuffer().size()) {
+ assertEquals(produceSocket.getBuffer(), consumeSocket1.getBuffer());
+ } else {
+ assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer());
+ }
+ assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
+ } finally {
+ stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient);
+ }
+ }
+
+ public static class CryptoKeyReaderFactoryImpl implements CryptoKeyReaderFactory {
+
+ private static final EncKeyReader reader = new EncKeyReader();
+
+ @Override
+ public CryptoKeyReader create() {
+ return reader;
+ }
+
+ }
+
+ public static class EncKeyReader implements CryptoKeyReader {
+
+ final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ private void stopWebSocketClient(WebSocketClient... clients) {
+ @Cleanup("shutdownNow")
+ ExecutorService executor = newFixedThreadPool(1);
+ try {
+ executor.submit(() -> {
+ for (WebSocketClient client : clients) {
+ try {
+ client.stop();
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ }
+ log.info("proxy clients are stopped successfully");
+ }).get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("failed to close proxy clients", e);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProxyEncryptionPublishConsumeTest.class);
+}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 78179a8dd7c..7ecc047bb4a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -454,6 +454,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
}
}
+ if (service.getCryptoKeyReader().isPresent()) {
+ builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+ }
return builder;
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java
new file mode 100644
index 00000000000..cad806c8788
--- /dev/null
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/CryptoKeyReaderFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+
+/**
+ * Factory class to create {@link CryptoKeyReader}.
+ *
+ */
+public interface CryptoKeyReaderFactory {
+ CryptoKeyReader create();
+}
\ No newline at end of file
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ca3e8ecbc3..760cb28b632 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory;
public class ProducerHandler extends AbstractWebSocketHandler {
+ private WebSocketService service;
private Producer<byte[]> producer;
private final LongAdder numMsgsSent;
private final LongAdder numMsgsFailed;
@@ -83,6 +84,7 @@ public class ProducerHandler extends AbstractWebSocketHandler {
this.numBytesSent = new LongAdder();
this.numMsgsFailed = new LongAdder();
this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
+ this.service = service;
if (!checkAuth(response)) {
return;
@@ -328,6 +330,14 @@ public class ProducerHandler extends AbstractWebSocketHandler {
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
}
+ if (queryParams.containsKey("encryptionKeys")) {
+ builder.cryptoKeyReader(service.getCryptoKeyReader().orElseThrow(() -> new IllegalStateException(
+ "Can't add encryption key without configuring cryptoKeyReaderFactoryClassName")));
+ String[] keys = queryParams.get("encryptionKeys").split(",");
+ for (String key : keys) {
+ builder.addEncryptionKey(key);
+ }
+ }
return builder;
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 6510386aef9..3c1cd2a9137 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -102,6 +102,9 @@ public class ReaderHandler extends AbstractWebSocketHandler {
log.warn("Failed to configure cryptoFailureAction {}, {}", action, e.getMessage());
}
}
+ if (service.getCryptoKeyReader().isPresent()) {
+ builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+ }
this.reader = builder.create();
Consumer<?> consumer = getConsumer();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 710e367af31..2a4a5977f7e 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -23,17 +23,21 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
@@ -64,6 +68,8 @@ public class WebSocketService implements Closeable {
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
+ @Getter
+ private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
private ClusterData localCluster;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
@@ -118,6 +124,19 @@ public class WebSocketService implements Closeable {
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
+ // initialize crypto key reader
+ String cryptoFactoryClassName = (String) config.getProperties().get("cryptoKeyReaderFactoryClassName");
+ if (StringUtils.isNotBlank(cryptoFactoryClassName)) {
+ try {
+ CryptoKeyReaderFactory factoryInstance = (CryptoKeyReaderFactory) Class.forName(cryptoFactoryClassName)
+ .getDeclaredConstructor().newInstance();
+ cryptoKeyReader = Optional.ofNullable(factoryInstance.create());
+ } catch (Exception e) {
+ log.info("Failed to initialize crypto-key reader", e);
+ throw new PulsarServerException(e);
+ }
+ }
+
log.info("Pulsar WebSocket Service started");
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 1c061b9da11..8fb3ab7ed01 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -245,6 +245,11 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
)
private Set<String> webServiceTlsCiphers = new TreeSet<>();
+ @FieldContext(
+ doc = "CryptoKeyReader factory classname to support encryption at websocket."
+ )
+ private String cryptoKeyReaderFactoryClassName;
+
@FieldContext(doc = "Key-value properties. Types are all String")
private Properties properties = new Properties();
diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md
index 6c84ba3d8ac..c5455b05388 100644
--- a/site2/docs/client-libraries-websocket.md
+++ b/site2/docs/client-libraries-websocket.md
@@ -60,6 +60,12 @@ tlsTrustCertsFilePath=/path/to/ca.cert.pem
```
+To enable encryption at rest on WebSocket service, add CryptoKeyReaderFactory factory class in classpath which will create CryptoKeyReader for WebSocket and that helps to load encryption keys for producer/consumer.
+
+```
+cryptoKeyReaderFactoryClassName=org.apache.pulsar.MyCryptoKeyReaderFactoryClassImpl
+```
+
### Starting the broker
When the configuration is set, you can start the service using the [`pulsar-daemon`](reference-cli-tools.md#pulsar-daemon) tool:
@@ -113,6 +119,7 @@ Key | Type | Required? | Explanation
`initialSequenceId` | long | no | Set the baseline for the sequence ids for messages published by the producer.
`hashingScheme` | string | no | [Hashing function](/api/client/org/apache/pulsar/client/api/ProducerConfiguration.HashingScheme.html) to use when publishing on a partitioned topic: `JavaStringHash`, `Murmur3_32Hash`
`token` | string | no | Authentication token, this is used for the browser javascript client
+`encryptionKeys` | string | no | Encryption key to encrypt published message only if encryption reader is configured using cryptoKeyReaderFactoryClassName config in websocket-configuration.
#### Publishing a message