You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:56:52 UTC
[pulsar] 02/22: [Pulsar SQL] Pulsar SQL support query big entry data (#12448)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b892196ef7b60d489f1fb27fb3c700fae51eb674
Author: ran <ga...@126.com>
AuthorDate: Tue Oct 26 08:24:33 2021 +0800
[Pulsar SQL] Pulsar SQL support query big entry data (#12448)
(cherry picked from commit a8f0788915393e438e5c38aa9497080fea19d733)
---
conf/presto/catalog/pulsar.properties | 3 +-
.../pulsar/sql/presto/PulsarConnectorCache.java | 4 +-
site2/docs/sql-deployment-configurations.md | 3 +
.../tests/integration/containers/BKContainer.java | 1 +
.../tests/integration/presto/TestBasicPresto.java | 66 ++++++++++++++--------
.../presto/TestPrestoQueryTieredStorage.java | 9 +--
.../integration/presto/TestPulsarSQLBase.java | 18 +++---
.../integration/suites/PulsarSQLTestSuite.java | 52 +++++++++++++++++
.../integration/topologies/PulsarCluster.java | 6 +-
.../integration/topologies/PulsarClusterSpec.java | 4 ++
10 files changed, 122 insertions(+), 44 deletions(-)
diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 1f5a89a..e273b98 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1
# to prevent erroneous rewriting
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/
-
+# max size of one batch message (default value is 5MB)
+# pulsar.max-message-size=5242880
####### TIERED STORAGE OFFLOADER CONFIGS #######
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index bf823de..9a64c05 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -109,7 +110,8 @@ public class PulsarConnectorCache {
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
- .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
+ .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
+ .setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
diff --git a/site2/docs/sql-deployment-configurations.md b/site2/docs/sql-deployment-configurations.md
index 1fe0353..e5c402e 100644
--- a/site2/docs/sql-deployment-configurations.md
+++ b/site2/docs/sql-deployment-configurations.md
@@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100
# default number of splits to use per query
pulsar.target-num-splits=4
+
+# max size of one batch message (default value is 5MB)
+pulsar.max-message-size=5242880
```
You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example.
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
index 36f17cd..b294cac4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/BKContainer.java
@@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer<BKContainer> {
public BKContainer(String clusterName, String hostName) {
super(
clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
+ tailContainerLog();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 9104390..62f59c3 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -27,7 +27,6 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -35,6 +34,7 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
@@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
private void setupPresto() throws Exception {
log.info("[TestBasicPresto] setupPresto...");
pulsarCluster.startPrestoWorker();
+ initJdbcConnection();
}
private void teardownPresto() {
@@ -161,31 +162,26 @@ public class TestBasicPresto extends TestPulsarSQLBase {
boolean useNsOffloadPolices,
Schema schema,
CompressionType compressionType) throws Exception {
- @Cleanup
- PulsarClient pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
- prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType);
+ prepareDataForBytesSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
- prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType);
+ prepareDataForByteBufferSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) {
- prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType);
+ prepareDataForStringSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
|| schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
- prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType);
+ prepareDataForStructSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) {
- prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType);
+ prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
- prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType);
+ prepareDataForKeyValueSchema(topicName, schema, compressionType);
}
return NUM_OF_STOCKS;
}
- private void prepareDataForBytesSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForBytesSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
@@ -201,8 +197,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.flush();
}
- private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForByteBufferSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
@@ -218,8 +213,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.flush();
}
- private void prepareDataForStringSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForStringSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
@@ -235,8 +229,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.flush();
}
- private void prepareDataForStructSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForStructSchema(TopicName topicName,
boolean isBatch,
Schema<Stock> schema,
CompressionType compressionType) throws Exception {
@@ -254,8 +247,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.flush();
}
- private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForProtobufNativeSchema(TopicName topicName,
boolean isBatch,
Schema<StockProtoMessage.Stock> schema,
CompressionType compressionType) throws Exception {
@@ -274,8 +266,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
producer.flush();
}
- private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
- TopicName topicName,
+ private void prepareDataForKeyValueSchema(TopicName topicName,
Schema<KeyValue<Stock, Stock>> schema,
CompressionType compressionType) throws Exception {
@Cleanup
@@ -342,4 +333,33 @@ public class TestBasicPresto extends TestPulsarSQLBase {
}
}
+ @Test(timeOut = 1000 * 30)
+ public void testQueueBigEntry() throws Exception {
+ String tableName = "big_data_" + randomName(5);
+ String topic = "persistent://public/default/" + tableName;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ // Make sure that the data length bigger than the default maxMessageSize
+ int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024;
+ Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize());
+ byte[] data = new byte[dataLength];
+ for (int i = 0; i < dataLength; i++) {
+ data[i] = 'a';
+ }
+
+ int messageCnt = 5;
+ log.info("start produce big entry data, data length: {}", dataLength);
+ for (int i = 0 ; i < messageCnt; ++i) {
+ producer.newMessage().value(data).send();
+ }
+
+ int count = selectCount("public/default", tableName);
+ Assert.assertEquals(count, messageCnt);
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 881dbe4..4c129fa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -40,8 +39,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -89,6 +86,7 @@ public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT);
pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties);
pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties);
+ initJdbcConnection();
}
private String getOffloadProperties(String bucket, String region, String endpoint) {
@@ -136,11 +134,6 @@ public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
Schema schema,
CompressionType compressionType) throws Exception {
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
- .build();
-
- @Cleanup
Consumer<Stock> consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class))
.topic(topicName.toString())
.subscriptionName("test")
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 026a32d..0626e35 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -21,8 +21,6 @@ package org.apache.pulsar.tests.integration.presto;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.base.Stopwatch;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -196,9 +194,6 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
);
// test predicate pushdown
- String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
- Connection connection = DriverManager.getConnection(url, "test", null);
-
String query = String.format("select * from pulsar" +
".\"%s\".\"%s\" order by __publish_time__", namespace, topic);
log.info("Executing query: {}", query);
@@ -267,11 +262,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
assertThat(returnedTimestamps.size()).isEqualTo(0);
- query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
- log.info("Executing query: {}", query);
- res = connection.createStatement().executeQuery(query);
- res.next();
- int count = res.getInt("_col0");
+ int count = selectCount(namespace, topic);
assertThat(count).isGreaterThan(messageNum - 2);
}
@@ -304,5 +295,12 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
}
+ protected int selectCount(String namespace, String tableName) throws SQLException {
+ String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName);
+ log.info("Executing count query: {}", query);
+ ResultSet res = connection.createStatement().executeQuery(query);
+ res.next();
+ return res.getInt("_col0");
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
index 9ed7335..762fff7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
@@ -18,13 +18,22 @@
*/
package org.apache.pulsar.tests.integration.suites;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+/**
+ * Pulsar SQL test suite.
+ */
@Slf4j
public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
@@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
public static final String BUCKET = "pulsar-integtest";
public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090";
+ protected Connection connection = null;
+ protected PulsarClient pulsarClient = null;
+
@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.queryLastMessage(true);
specBuilder.clusterName("pulsar-sql-test");
specBuilder.numBrokers(1);
+ specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE);
return super.beforeSetupCluster(clusterName, specBuilder);
}
@@ -55,4 +68,43 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
}
}
+ @Override
+ public void setupCluster() throws Exception {
+ super.setupCluster();
+ pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ }
+
+ protected void initJdbcConnection() throws SQLException {
+ if (pulsarCluster.getPrestoWorkerContainer() == null) {
+ log.error("The presto work container isn't exist.");
+ return;
+ }
+ String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
+ connection = DriverManager.getConnection(url, "test", null);
+ }
+
+ @Override
+ public void tearDownCluster() throws Exception {
+ close();
+ super.tearDownCluster();
+ }
+
+ protected void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ log.error("Failed to close sql connection.", e);
+ }
+ }
+ if (pulsarClient != null) {
+ try {
+ pulsarClient.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close pulsar client.", e);
+ }
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 2191799..7117cbb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -80,6 +80,7 @@ public class PulsarCluster {
return new PulsarCluster(spec, csContainer, true);
}
+ @Getter
private final PulsarClusterSpec spec;
@Getter
@@ -157,6 +158,7 @@ public class PulsarCluster {
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("diskUsageThreshold", "0.99")
+ .withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)
)
);
@@ -173,7 +175,8 @@ public class PulsarCluster {
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
- .withEnv("AWS_SECRET_KEY", "secretkey");
+ .withEnv("AWS_SECRET_KEY", "secretkey")
+ .withEnv("maxMessageSize", "" + spec.maxMessageSize);
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
@@ -431,6 +434,7 @@ public class PulsarCluster {
.withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080")
+ .withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize)
.withClasspathResourceMapping(
resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
if (spec.queryLastMessage) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 9dcfcfb..eed6042 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -29,6 +29,7 @@ import lombok.Setter;
import lombok.Singular;
import lombok.experimental.Accessors;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.testcontainers.containers.GenericContainer;
@@ -152,4 +153,7 @@ public class PulsarClusterSpec {
* Specify mount files.
*/
Map<String, String> brokerMountFiles;
+
+ @Default
+ int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
}