You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/12/08 13:12:47 UTC
[pulsar-client-node] branch master updated: feat: Add chunking support (#234)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 7957253 feat: Add chunking support (#234)
7957253 is described below
commit 79572533c192f288124a5211377a7e76acdc2554
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Dec 8 21:12:42 2022 +0800
feat: Add chunking support (#234)
---
build-support/pulsar-test-container-start.sh | 4 ++++
build-support/pulsar-test-service-start.sh | 2 +-
index.d.ts | 3 +++
package.json | 2 +-
src/ConsumerConfig.cc | 21 ++++++++++++++++++
src/ProducerConfig.cc | 6 +++++
tests/conf/standalone.conf | 6 +++++
tests/end_to_end.test.js | 33 ++++++++++++++++++++++++++++
8 files changed, 75 insertions(+), 2 deletions(-)
diff --git a/build-support/pulsar-test-container-start.sh b/build-support/pulsar-test-container-start.sh
index f51bfeb..96c7758 100755
--- a/build-support/pulsar-test-container-start.sh
+++ b/build-support/pulsar-test-container-start.sh
@@ -21,6 +21,10 @@
set -e -x
export PULSAR_STANDALONE_CONF=test-conf/standalone.conf
+# There is an issue when starting the pulsar standalone with other metadata store: https://github.com/apache/pulsar/issues/17984
+# We need to use Zookeeper here. Otherwise the `Message Chunking` test will not pass.
+# We can remove this line after the pulsar release a new version contains this fix: https://github.com/apache/pulsar/pull/18126
+export PULSAR_STANDALONE_USE_ZOOKEEPER=1
bin/pulsar-daemon start standalone \
--no-functions-worker --no-stream-storage \
--bookkeeper-dir data/bookkeeper
diff --git a/build-support/pulsar-test-service-start.sh b/build-support/pulsar-test-service-start.sh
index b2e25d5..e897c2b 100755
--- a/build-support/pulsar-test-service-start.sh
+++ b/build-support/pulsar-test-service-start.sh
@@ -25,7 +25,7 @@ cd $SRC_DIR
build-support/pulsar-test-service-stop.sh
-CONTAINER_ID=$(docker run -i -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:latest sleep 3600)
+CONTAINER_ID=$(docker run -i -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:2.10.2 sleep 3600)
echo $CONTAINER_ID >.tests-container-id.txt
diff --git a/index.d.ts b/index.d.ts
index 312f903..5fc5f47 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -60,6 +60,7 @@ export interface ProducerConfig {
publicKeyPath?: string;
encryptionKey?: string;
cryptoFailureAction?: ProducerCryptoFailureAction;
+ chunkingEnabled?: boolean;
}
export class Producer {
@@ -88,6 +89,8 @@ export interface ConsumerConfig {
readCompacted?: boolean;
privateKeyPath?: string;
cryptoFailureAction?: ConsumerCryptoFailureAction;
+ maxPendingChunkedMessage?: number;
+ autoAckOldestChunkedMessageOnQueueFull?: number;
}
export class Consumer {
diff --git a/package.json b/package.json
index 30bb051..2d60c8c 100644
--- a/package.json
+++ b/package.json
@@ -19,7 +19,7 @@
"dtslint": "dtslint .",
"license:report": "mkdir -p report && grunt grunt-license-report",
"license:addheader": "license-check-and-add",
- "test": "jest --verbose"
+ "test": "jest --verbose --detectOpenHandles"
},
"repository": {
"type": "git",
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index dd48459..13e1a75 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -42,6 +42,9 @@ static const std::string CFG_READ_COMPACTED = "readCompacted";
static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PRIVATE_KEY_PATH = "privateKeyPath";
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
+static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMessage";
+static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
+ "autoAckOldestChunkedMessageOnQueueFull";
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
@@ -194,6 +197,24 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
}
}
}
+
+ if (consumerConfig.Has(CFG_MAX_PENDING_CHUNKED_MESSAGE) &&
+ consumerConfig.Get(CFG_MAX_PENDING_CHUNKED_MESSAGE).IsNumber()) {
+ int32_t maxPendingChunkedMessage =
+ consumerConfig.Get(CFG_MAX_PENDING_CHUNKED_MESSAGE).ToNumber().Int32Value();
+ if (maxPendingChunkedMessage >= 0) {
+ pulsar_consumer_configuration_set_max_pending_chunked_message(this->cConsumerConfig.get(),
+ maxPendingChunkedMessage);
+ }
+ }
+
+ if (consumerConfig.Has(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL) &&
+ consumerConfig.Get(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL).IsBoolean()) {
+ bool autoAckOldestChunkedMessageOnQueueFull =
+ consumerConfig.Get(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL).ToBoolean();
+ pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
+ this->cConsumerConfig.get(), autoAckOldestChunkedMessageOnQueueFull);
+ }
}
ConsumerConfig::~ConsumerConfig() {
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 5170a19..52d7707 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -38,6 +38,7 @@ static const std::string CFG_PROPS = "properties";
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
+static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = {
{"UseSinglePartition", pulsar_UseSinglePartition},
@@ -188,6 +189,11 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
this->cProducerConfig.get(), PRODUCER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
}
}
+
+ if (producerConfig.Has(CFG_CHUNK_ENABLED) && producerConfig.Get(CFG_CHUNK_ENABLED).IsBoolean()) {
+ bool chunkingEnabled = producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
+ pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(), chunkingEnabled);
+ }
}
ProducerConfig::~ProducerConfig() {}
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index a5a31cc..b75f725 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -275,3 +275,9 @@ keepAliveIntervalSeconds=30
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60
+
+### --- BookKeeper Configuration --- #####
+
+# The maximum netty frame size in bytes. Any message received larger than this will be rejected. The default value is 5MB.
+nettyMaxFrameSizeBytes=5253120
+
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 54bba38..66f21df 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -1026,5 +1026,38 @@ const Pulsar = require('../index.js');
await reader.close();
await client.close();
});
+
+ test('Message chunking', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'persistent://public/default/message-chunking';
+ const producer = await client.createProducer({
+ topic,
+ batchingEnabled: false,
+ chunkingEnabled: true,
+ });
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ maxPendingChunkedMessage: 15,
+ autoAckOldestChunkedMessageOnQueueFull: true,
+ });
+
+ const sendMsg = Buffer.alloc(10 * 1024 * 1024);
+
+ await producer.send({
+ data: sendMsg,
+ });
+
+ const receiveMsg = await consumer.receive(3000);
+ expect(receiveMsg.getData().length).toBe(sendMsg.length);
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
});
})();