You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:52 UTC

[pulsar] 02/22: [Pulsar SQL] Pulsar SQL support query big entry data (#12448)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b892196ef7b60d489f1fb27fb3c700fae51eb674
Author: ran <ga...@126.com>
AuthorDate: Tue Oct 26 08:24:33 2021 +0800

    [Pulsar SQL] Pulsar SQL support query big entry data (#12448)
    
    (cherry picked from commit a8f0788915393e438e5c38aa9497080fea19d733)
---
 conf/presto/catalog/pulsar.properties              |  3 +-
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  4 +-
 site2/docs/sql-deployment-configurations.md        |  3 +
 .../tests/integration/containers/BKContainer.java  |  1 +
 .../tests/integration/presto/TestBasicPresto.java  | 66 ++++++++++++++--------
 .../presto/TestPrestoQueryTieredStorage.java       |  9 +--
 .../integration/presto/TestPulsarSQLBase.java      | 18 +++---
 .../integration/suites/PulsarSQLTestSuite.java     | 52 +++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  6 +-
 .../integration/topologies/PulsarClusterSpec.java  |  4 ++
 10 files changed, 122 insertions(+), 44 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 1f5a89a..e273b98 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1
 # to prevent erroneous rewriting
 pulsar.namespace-delimiter-rewrite-enable=false
 pulsar.rewrite-namespace-delimiter=/
-
+# max size of one batch message (default value is 5MB)
+# pulsar.max-message-size=5242880
 
 ####### TIERED STORAGE OFFLOADER CONFIGS #######
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index bf823de..9a64c05 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
@@ -109,7 +110,8 @@ public class PulsarConnectorCache {
             .setReadEntryTimeout(60)
             .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
             .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
-            .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
+            .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
+            .setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
 
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
         managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md
index 1fe0353..e5c402e 100644
--- a/site2/docs/sql-deployment-configurations.md
+++ b/site2/docs/sql-deployment-configurations.md
@@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100
 
 # default number of splits to use per query
 pulsar.target-num-splits=4
+
+# max size of one batch message (default value is 5MB)
+pulsar.max-message-size=5242880
 ```
 
 You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example.
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
index 36f17cd..b294cac4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
@@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer<BKContainer> {
     public BKContainer(String clusterName, String hostName) {
         super(
             clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
+        tailContainerLog();
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 9104390..62f59c3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -27,7 +27,6 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -35,6 +34,7 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
     private void setupPresto() throws Exception {
         log.info("[TestBasicPresto] setupPresto...");
         pulsarCluster.startPrestoWorker();
+        initJdbcConnection();
     }
 
     private void teardownPresto() {
@@ -161,31 +162,26 @@ public class TestBasicPresto extends TestPulsarSQLBase {
                               boolean useNsOffloadPolices,
                               Schema schema,
                               CompressionType compressionType) throws Exception {
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                .build();
 
         if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
-            prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType);
+            prepareDataForBytesSchema(topicName, isBatch, compressionType);
         } else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
-            prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType);
+            prepareDataForByteBufferSchema(topicName, isBatch, compressionType);
         } else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) {
-            prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType);
+            prepareDataForStringSchema(topicName, isBatch, compressionType);
         } else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
                 || schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
-            prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType);
+            prepareDataForStructSchema(topicName, isBatch, schema, compressionType);
         } else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) {
-            prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType);
+            prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType);
         } else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
-            prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType);
+            prepareDataForKeyValueSchema(topicName, schema, compressionType);
         }
 
         return NUM_OF_STOCKS;
     }
 
-    private void prepareDataForBytesSchema(PulsarClient pulsarClient,
-                                           TopicName topicName,
+    private void prepareDataForBytesSchema(TopicName topicName,
                                            boolean isBatch,
                                            CompressionType compressionType) throws PulsarClientException {
         @Cleanup
@@ -201,8 +197,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         producer.flush();
     }
 
-    private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
-                                                TopicName topicName,
+    private void prepareDataForByteBufferSchema(TopicName topicName,
                                                 boolean isBatch,
                                                 CompressionType compressionType) throws PulsarClientException {
         @Cleanup
@@ -218,8 +213,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         producer.flush();
     }
 
-    private void prepareDataForStringSchema(PulsarClient pulsarClient,
-                                            TopicName topicName,
+    private void prepareDataForStringSchema(TopicName topicName,
                                             boolean isBatch,
                                             CompressionType compressionType) throws PulsarClientException {
         @Cleanup
@@ -235,8 +229,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         producer.flush();
     }
 
-    private void prepareDataForStructSchema(PulsarClient pulsarClient,
-                                            TopicName topicName,
+    private void prepareDataForStructSchema(TopicName topicName,
                                             boolean isBatch,
                                             Schema<Stock> schema,
                                             CompressionType compressionType) throws Exception {
@@ -254,8 +247,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         producer.flush();
     }
 
-    private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
-                                            TopicName topicName,
+    private void prepareDataForProtobufNativeSchema(TopicName topicName,
                                             boolean isBatch,
                                             Schema<StockProtoMessage.Stock> schema,
                                             CompressionType compressionType) throws Exception {
@@ -274,8 +266,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         producer.flush();
     }
 
-    private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
-                                              TopicName topicName,
+    private void prepareDataForKeyValueSchema(TopicName topicName,
                                               Schema<KeyValue<Stock, Stock>> schema,
                                               CompressionType compressionType) throws Exception {
         @Cleanup
@@ -342,4 +333,33 @@ public class TestBasicPresto extends TestPulsarSQLBase {
         }
     }
 
+    @Test(timeOut = 1000 * 30)
+    public void testQueueBigEntry() throws Exception {
+        String tableName = "big_data_" + randomName(5);
+        String topic = "persistent://public/default/" + tableName;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        // Make sure that the data length bigger than the default maxMessageSize
+        int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024;
+        Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize());
+        byte[] data = new byte[dataLength];
+        for (int i = 0; i < dataLength; i++) {
+            data[i] = 'a';
+        }
+
+        int messageCnt = 5;
+        log.info("start produce big entry data, data length: {}", dataLength);
+        for (int i = 0 ; i < messageCnt; ++i) {
+            producer.newMessage().value(data).send();
+        }
+
+        int count = selectCount("public/default", tableName);
+        Assert.assertEquals(count, messageCnt);
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 881dbe4..4c129fa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -40,8 +39,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.containers.S3Container;
 import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
@@ -89,6 +86,7 @@ public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
         String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT);
         pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties);
         pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties);
+        initJdbcConnection();
     }
 
     private String getOffloadProperties(String bucket, String region, String endpoint) {
@@ -136,11 +134,6 @@ public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
                               Schema schema,
                               CompressionType compressionType) throws Exception {
         @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder()
-                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                .build();
-
-        @Cleanup
         Consumer<Stock> consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class))
                 .topic(topicName.toString())
                 .subscriptionName("test")
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 026a32d..0626e35 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.tests.integration.presto;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.base.Stopwatch;
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -196,9 +194,6 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         );
 
         // test predicate pushdown
-        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
-        Connection connection = DriverManager.getConnection(url, "test", null);
-
         String query = String.format("select * from pulsar" +
                 ".\"%s\".\"%s\" order by __publish_time__", namespace, topic);
         log.info("Executing query: {}", query);
@@ -267,11 +262,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
         log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
         assertThat(returnedTimestamps.size()).isEqualTo(0);
 
-        query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-        res.next();
-        int count = res.getInt("_col0");
+        int count = selectCount(namespace, topic);
         assertThat(count).isGreaterThan(messageNum - 2);
     }
 
@@ -304,5 +295,12 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
 
     }
 
+    protected int selectCount(String namespace, String tableName) throws SQLException {
+        String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName);
+        log.info("Executing count query: {}", query);
+        ResultSet res = connection.createStatement().executeQuery(query);
+        res.next();
+        return res.getInt("_col0");
+    }
 
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
index 9ed7335..762fff7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
@@ -18,13 +18,22 @@
  */
 package org.apache.pulsar.tests.integration.suites;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.S3Container;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 
+/**
+ * Pulsar SQL test suite.
+ */
 @Slf4j
 public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
 
@@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
     public static final String BUCKET = "pulsar-integtest";
     public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090";
 
+    protected Connection connection = null;
+    protected PulsarClient pulsarClient = null;
+
     @Override
     protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
         specBuilder.queryLastMessage(true);
         specBuilder.clusterName("pulsar-sql-test");
         specBuilder.numBrokers(1);
+        specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE);
         return super.beforeSetupCluster(clusterName, specBuilder);
     }
 
@@ -55,4 +68,43 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
         }
     }
 
+    @Override
+    public void setupCluster() throws Exception {
+        super.setupCluster();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+    }
+
+    protected void initJdbcConnection() throws SQLException {
+        if (pulsarCluster.getPrestoWorkerContainer() == null) {
+            log.error("The presto work container isn't exist.");
+            return;
+        }
+        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
+        connection = DriverManager.getConnection(url, "test", null);
+    }
+
+    @Override
+    public void tearDownCluster() throws Exception {
+        close();
+        super.tearDownCluster();
+    }
+
+    protected void close() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                log.error("Failed to close sql connection.", e);
+            }
+        }
+        if (pulsarClient != null) {
+            try {
+                pulsarClient.close();
+            } catch (PulsarClientException e) {
+                log.error("Failed to close pulsar client.", e);
+            }
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 2191799..7117cbb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -80,6 +80,7 @@ public class PulsarCluster {
         return new PulsarCluster(spec, csContainer, true);
     }
 
+    @Getter
     private final PulsarClusterSpec spec;
 
     @Getter
@@ -157,6 +158,7 @@ public class PulsarCluster {
                         .withEnv("journalMaxGroupWaitMSec", "0")
                         .withEnv("clusterName", clusterName)
                         .withEnv("diskUsageThreshold", "0.99")
+                        .withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)
                 )
         );
 
@@ -173,7 +175,8 @@ public class PulsarCluster {
                         .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
                         // used in s3 tests
                         .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
-                        .withEnv("AWS_SECRET_KEY", "secretkey");
+                        .withEnv("AWS_SECRET_KEY", "secretkey")
+                        .withEnv("maxMessageSize", "" + spec.maxMessageSize);
                     if (spec.queryLastMessage) {
                         brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
                         brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
@@ -431,6 +434,7 @@ public class PulsarCluster {
                 .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
                 .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
                 .withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080")
+                .withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize)
                 .withClasspathResourceMapping(
                         resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
         if (spec.queryLastMessage) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 9dcfcfb..eed6042 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -29,6 +29,7 @@ import lombok.Setter;
 import lombok.Singular;
 import lombok.experimental.Accessors;
 
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.testcontainers.containers.GenericContainer;
 
@@ -152,4 +153,7 @@ public class PulsarClusterSpec {
      * Specify mount files.
      */
     Map<String, String> brokerMountFiles;
+
+    @Default
+    int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
 }