You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/12/08 13:12:47 UTC

[pulsar-client-node] branch master updated: feat: Add chunking support (#234)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 7957253  feat: Add chunking support (#234)
7957253 is described below

commit 79572533c192f288124a5211377a7e76acdc2554
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Dec 8 21:12:42 2022 +0800

    feat: Add chunking support (#234)
---
 build-support/pulsar-test-container-start.sh |  4 ++++
 build-support/pulsar-test-service-start.sh   |  2 +-
 index.d.ts                                   |  3 +++
 package.json                                 |  2 +-
 src/ConsumerConfig.cc                        | 21 ++++++++++++++++++
 src/ProducerConfig.cc                        |  6 +++++
 tests/conf/standalone.conf                   |  6 +++++
 tests/end_to_end.test.js                     | 33 ++++++++++++++++++++++++++++
 8 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/build-support/pulsar-test-container-start.sh b/build-support/pulsar-test-container-start.sh
index f51bfeb..96c7758 100755
--- a/build-support/pulsar-test-container-start.sh
+++ b/build-support/pulsar-test-container-start.sh
@@ -21,6 +21,10 @@
 set -e -x
 
 export PULSAR_STANDALONE_CONF=test-conf/standalone.conf
+# There is an issue when starting the pulsar standalone with other metadata store: https://github.com/apache/pulsar/issues/17984
+# We need to use Zookeeper here. Otherwise the `Message Chunking` test will not pass.
+# We can remove this line after the pulsar release a new version contains this fix: https://github.com/apache/pulsar/pull/18126
+export PULSAR_STANDALONE_USE_ZOOKEEPER=1
 bin/pulsar-daemon start standalone \
         --no-functions-worker --no-stream-storage \
         --bookkeeper-dir data/bookkeeper
diff --git a/build-support/pulsar-test-service-start.sh b/build-support/pulsar-test-service-start.sh
index b2e25d5..e897c2b 100755
--- a/build-support/pulsar-test-service-start.sh
+++ b/build-support/pulsar-test-service-start.sh
@@ -25,7 +25,7 @@ cd $SRC_DIR
 
 build-support/pulsar-test-service-stop.sh
 
-CONTAINER_ID=$(docker run -i -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:latest sleep 3600)
+CONTAINER_ID=$(docker run -i -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:2.10.2 sleep 3600)
 
 echo $CONTAINER_ID >.tests-container-id.txt
 
diff --git a/index.d.ts b/index.d.ts
index 312f903..5fc5f47 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -60,6 +60,7 @@ export interface ProducerConfig {
   publicKeyPath?: string;
   encryptionKey?: string;
   cryptoFailureAction?: ProducerCryptoFailureAction;
+  chunkingEnabled?: boolean;
 }
 
 export class Producer {
@@ -88,6 +89,8 @@ export interface ConsumerConfig {
   readCompacted?: boolean;
   privateKeyPath?: string;
   cryptoFailureAction?: ConsumerCryptoFailureAction;
+  maxPendingChunkedMessage?: number;
+  autoAckOldestChunkedMessageOnQueueFull?: number;
 }
 
 export class Consumer {
diff --git a/package.json b/package.json
index 30bb051..2d60c8c 100644
--- a/package.json
+++ b/package.json
@@ -19,7 +19,7 @@
     "dtslint": "dtslint .",
     "license:report": "mkdir -p report && grunt grunt-license-report",
     "license:addheader": "license-check-and-add",
-    "test": "jest --verbose"
+    "test": "jest --verbose --detectOpenHandles"
   },
   "repository": {
     "type": "git",
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index dd48459..13e1a75 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -42,6 +42,9 @@ static const std::string CFG_READ_COMPACTED = "readCompacted";
 static const std::string CFG_SCHEMA = "schema";
 static const std::string CFG_PRIVATE_KEY_PATH = "privateKeyPath";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
+static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMessage";
+static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
+    "autoAckOldestChunkedMessageOnQueueFull";
 
 static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
     {"Exclusive", pulsar_ConsumerExclusive},
@@ -194,6 +197,24 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
       }
     }
   }
+
+  if (consumerConfig.Has(CFG_MAX_PENDING_CHUNKED_MESSAGE) &&
+      consumerConfig.Get(CFG_MAX_PENDING_CHUNKED_MESSAGE).IsNumber()) {
+    int32_t maxPendingChunkedMessage =
+        consumerConfig.Get(CFG_MAX_PENDING_CHUNKED_MESSAGE).ToNumber().Int32Value();
+    if (maxPendingChunkedMessage >= 0) {
+      pulsar_consumer_configuration_set_max_pending_chunked_message(this->cConsumerConfig.get(),
+                                                                    maxPendingChunkedMessage);
+    }
+  }
+
+  if (consumerConfig.Has(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL) &&
+      consumerConfig.Get(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL).IsBoolean()) {
+    bool autoAckOldestChunkedMessageOnQueueFull =
+        consumerConfig.Get(CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL).ToBoolean();
+    pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
+        this->cConsumerConfig.get(), autoAckOldestChunkedMessageOnQueueFull);
+  }
 }
 
 ConsumerConfig::~ConsumerConfig() {
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 5170a19..52d7707 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -38,6 +38,7 @@ static const std::string CFG_PROPS = "properties";
 static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
 static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
+static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
 
 static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = {
     {"UseSinglePartition", pulsar_UseSinglePartition},
@@ -188,6 +189,11 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
             this->cProducerConfig.get(), PRODUCER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
     }
   }
+
+  if (producerConfig.Has(CFG_CHUNK_ENABLED) && producerConfig.Get(CFG_CHUNK_ENABLED).IsBoolean()) {
+    bool chunkingEnabled = producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
+    pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(), chunkingEnabled);
+  }
 }
 
 ProducerConfig::~ProducerConfig() {}
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index a5a31cc..b75f725 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -275,3 +275,9 @@ keepAliveIntervalSeconds=30
 
 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
 brokerServicePurgeInactiveFrequencyInSeconds=60
+
+### --- BookKeeper Configuration --- #####
+
+# The maximum netty frame size in bytes. Any message received larger than this will be rejected. The default value is 5MB.
+nettyMaxFrameSizeBytes=5253120
+
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 54bba38..66f21df 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -1026,5 +1026,38 @@ const Pulsar = require('../index.js');
       await reader.close();
       await client.close();
     });
+
+    test('Message chunking', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/message-chunking';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+        chunkingEnabled: true,
+      });
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        maxPendingChunkedMessage: 15,
+        autoAckOldestChunkedMessageOnQueueFull: true,
+      });
+
+      const sendMsg = Buffer.alloc(10 * 1024 * 1024);
+
+      await producer.send({
+        data: sendMsg,
+      });
+
+      const receiveMsg = await consumer.receive(3000);
+      expect(receiveMsg.getData().length).toBe(sendMsg.length);
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
   });
 })();