You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 12:19:42 UTC
[pulsar] branch master updated: Fix the pulsar sql can not query
the last message issue (#8635)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 50a81b22 Fix the pulsar sql can not query the last message issue (#8635)
50a81b22 is described below
commit 50a81b22e97fa3717bf4753ec9e52aad9aaa6109
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Nov 20 20:19:16 2020 +0800
Fix the pulsar sql can not query the last message issue (#8635)
Fixes #7811
### Motivation
Currently, the bookkeeper client gets the LAC is the piggyback lac which is carried by the next message. This causes the pulsar SQL connector will always get the `message number -1` messages. If we want to query all messages, we need to use the explicit LAC to get the total number of entries.
BookKeeper 4.12.0 makes us can use the `useV2Protocol` to enable using v3 protocol which will get the explicit LAC.
### Modifications
- Add the properties for the pulsar SQL.
- Update the integration test to query the last message.
---
conf/presto/catalog/pulsar.properties | 5 +++
conf/standalone.conf | 3 ++
pom.xml | 1 -
.../pulsar/sql/presto/PulsarConnectorCache.java | 21 +++++-----
.../pulsar/sql/presto/PulsarConnectorConfig.java | 22 ++++++++++
.../tests/integration/presto/TestBasicPresto.java | 13 ++++--
.../integration/topologies/PulsarCluster.java | 48 ++++++++++++++--------
.../integration/topologies/PulsarClusterSpec.java | 6 +++
8 files changed, 87 insertions(+), 32 deletions(-)
diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 0ba4f8d..16ddb04 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -86,6 +86,11 @@ pulsar.bookkeeper-throttle-value = 0
# default is Runtime.getRuntime().availableProcessors().
# pulsar.bookkeeper-num-worker-threads =
+# Whether the bookkeeper client use v2 protocol or v3 protocol.
+# Default is the v2 protocol which the LAC is piggy back lac. Otherwise the client
+# will use v3 protocol and use explicit lac.
+pulsar.bookkeeper-use-v2-protocol=true
+pulsar.bookkeeper-explicit-interval=0
####### MANAGED LEDGER CONFIGS #######
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 9deb862..7f8b5fa 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -522,6 +522,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
# A value of '0' disables sending any explicit LACs. Default is 0.
bookkeeperExplicitLacIntervalInMills=0
+# Use older Bookkeeper wire protocol with bookie
+bookkeeperUseV2WireProtocol=true
+
# Expose bookkeeper client managed ledger stats to prometheus. default is false
# bookkeeperClientExposeStatsToPrometheus=false
diff --git a/pom.xml b/pom.xml
index 04ae478..02ba4b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@ flexible messaging model and an intuitive client API.</description>
<zookeeper.version>3.5.7</zookeeper.version>
<netty.version>4.1.51.Final</netty.version>
<netty-tc-native.version>2.0.33.Final</netty-tc-native.version>
- <storm.version>2.0.0</storm.version>
<jetty.version>9.4.33.v20201020</jetty.version>
<jersey.version>2.31</jersey.version>
<athenz.version>1.8.38</athenz.version>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 496df72..ec0e3b3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -94,16 +94,17 @@ public class PulsarConnectorCache {
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
- .setZkServers(pulsarConnectorConfig.getZookeeperUri())
- .setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri()
- .replace(",", ";") + "/ledgers")
- .setClientTcpNoDelay(false)
- .setUseV2WireProtocol(true)
- .setStickyReadsEnabled(false)
- .setReadEntryTimeout(60)
- .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
- .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
- .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
+ .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+ .setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri()
+ .replace(",", ";") + "/ledgers")
+ .setClientTcpNoDelay(false)
+ .setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
+ .setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
+ .setStickyReadsEnabled(false)
+ .setReadEntryTimeout(60)
+ .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
+ .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
+ .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 72bbb7b..781c39b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -72,6 +72,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int bookkeeperThrottleValue = 0;
private int bookkeeperNumIOThreads = 2 * Runtime.getRuntime().availableProcessors();
private int bookkeeperNumWorkerThreads = Runtime.getRuntime().availableProcessors();
+ private boolean bookkeeperUseV2Protocol = true;
+ private int bookkeeperExplicitInterval = 0;
// --- ManagedLedger
private long managedLedgerCacheSizeMB = 0L;
@@ -334,6 +336,26 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
+ public boolean getBookkeeperUseV2Protocol() {
+ return bookkeeperUseV2Protocol;
+ }
+
+ @Config("pulsar.bookkeeper-use-v2-protocol")
+ public PulsarConnectorConfig setBookkeeperUseV2Protocol(boolean bookkeeperUseV2Protocol) {
+ this.bookkeeperUseV2Protocol = bookkeeperUseV2Protocol;
+ return this;
+ }
+
+ public int getBookkeeperExplicitInterval() {
+ return bookkeeperExplicitInterval;
+ }
+
+ @Config("pulsar.bookkeeper-explicit-interval")
+ public PulsarConnectorConfig setBookkeeperExplicitInterval(int bookkeeperExplicitInterval) {
+ this.bookkeeperExplicitInterval = bookkeeperExplicitInterval;
+ return this;
+ }
+
// --- ManagedLedger
public long getManagedLedgerCacheSizeMB() {
return managedLedgerCacheSizeMB;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index e29d945..ed09b31 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -27,9 +27,11 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import java.sql.Connection;
@@ -48,6 +50,11 @@ public class TestBasicPresto extends PulsarTestSuite {
private static final int NUM_OF_STOCKS = 10;
+ @Override
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+ return super.beforeSetupCluster(clusterName, specBuilder.queryLastMessage(true));
+ }
+
@BeforeClass
public void setupPresto() throws Exception {
log.info("[setupPresto]");
@@ -125,11 +132,11 @@ public class TestBasicPresto extends PulsarTestSuite {
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
- assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
+ assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
String[] split2 = containerExecResult.getStdout().split("\n|,");
- for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
+ for (int i = 0; i < NUM_OF_STOCKS; ++i) {
assertThat(split2).contains("\"" + i + "\"");
assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
@@ -164,7 +171,7 @@ public class TestBasicPresto extends PulsarTestSuite {
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
- assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
+ assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2);
// Try with a predicate that has a earlier time than any entry
// Should return all rows
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index b916eea..0cf328c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -95,13 +95,15 @@ public class PulsarCluster {
if (enablePrestoWorker) {
prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
- .withNetwork(network)
- .withNetworkAliases(PrestoWorkerContainer.NAME)
- .withEnv("clusterName", clusterName)
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
- .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
- .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+ .withNetwork(network)
+ .withNetworkAliases(PrestoWorkerContainer.NAME)
+ .withEnv("clusterName", clusterName)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+ .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+ .withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
+ .withEnv("pulsar.bookkeeper-explicit-interval", "10")
+ .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
} else {
prestoWorkerContainer = null;
}
@@ -150,7 +152,8 @@ public class PulsarCluster {
// create brokers
brokerContainers.putAll(
- runNumContainers("broker", spec.numBrokers(), (name) -> new BrokerContainer(clusterName, name)
+ runNumContainers("broker", spec.numBrokers(), (name) -> {
+ BrokerContainer brokerContainer = new BrokerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
.withEnv("zkServers", ZKContainer.NAME)
@@ -160,9 +163,14 @@ public class PulsarCluster {
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
- .withEnv("AWS_SECRET_KEY", "secretkey")
- )
- );
+ .withEnv("AWS_SECRET_KEY", "secretkey");
+ if (spec.queryLastMessage) {
+ brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
+ brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
+ }
+ return brokerContainer;
+ }
+ ));
spec.classPathVolumeMounts.forEach((key, value) -> {
zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
@@ -339,13 +347,17 @@ public class PulsarCluster {
log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties);
if (null == prestoWorkerContainer) {
prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
- .withNetwork(network)
- .withNetworkAliases(PrestoWorkerContainer.NAME)
- .withEnv("clusterName", clusterName)
- .withEnv("zkServers", ZKContainer.NAME)
- .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
- .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
- .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+ .withNetwork(network)
+ .withNetworkAliases(PrestoWorkerContainer.NAME)
+ .withEnv("clusterName", clusterName)
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+ .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+ .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+ if (spec.queryLastMessage) {
+ prestoWorkerContainer.withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
+ .withEnv("pulsar.bookkeeper-explicit-interval", "10");
+ }
if (offloadDriver != null && offloadProperties != null) {
log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
offloadDriver, offloadProperties);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index e1e26ef..1498f37 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -89,6 +89,12 @@ public class PulsarClusterSpec {
boolean enablePrestoWorker = false;
/**
+ * Allow to query the last message
+ */
+ @Default
+ boolean queryLastMessage = false;
+
+ /**
* Returns the function runtime type.
*
* @return the function runtime type.