You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 12:19:42 UTC

[pulsar] branch master updated: Fix the pulsar sql can not query the last message issue (#8635)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 50a81b22 Fix the pulsar sql can not query the last message issue (#8635)
50a81b22 is described below

commit 50a81b22e97fa3717bf4753ec9e52aad9aaa6109
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Nov 20 20:19:16 2020 +0800

    Fix the pulsar sql can not query the last message issue (#8635)
    
    Fixes #7811
    
    ### Motivation
    
    Currently, the bookkeeper client gets the LAC is the piggyback lac which is carried by the next message. This causes the pulsar SQL connector will always get the `message number -1` messages. If we want to query all messages, we need to use the explicit LAC to get the total number of entries.
    BookKeeper 4.12.0 makes us can use the `useV2Protocol` to enable using v3 protocol which will get the explicit LAC.
    
    ### Modifications
    
    - Add the properties for the pulsar SQL.
    - Update the integration test to query the last message.
---
 conf/presto/catalog/pulsar.properties              |  5 +++
 conf/standalone.conf                               |  3 ++
 pom.xml                                            |  1 -
 .../pulsar/sql/presto/PulsarConnectorCache.java    | 21 +++++-----
 .../pulsar/sql/presto/PulsarConnectorConfig.java   | 22 ++++++++++
 .../tests/integration/presto/TestBasicPresto.java  | 13 ++++--
 .../integration/topologies/PulsarCluster.java      | 48 ++++++++++++++--------
 .../integration/topologies/PulsarClusterSpec.java  |  6 +++
 8 files changed, 87 insertions(+), 32 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 0ba4f8d..16ddb04 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -86,6 +86,11 @@ pulsar.bookkeeper-throttle-value = 0
 # default is Runtime.getRuntime().availableProcessors().
 # pulsar.bookkeeper-num-worker-threads =
 
+# Whether the bookkeeper client use v2 protocol or v3 protocol.
+# Default is the v2 protocol which the LAC is piggy back lac. Otherwise the client
+# will use v3 protocol and use explicit lac.
+pulsar.bookkeeper-use-v2-protocol=true
+pulsar.bookkeeper-explicit-interval=0
 
 ####### MANAGED LEDGER CONFIGS #######
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 9deb862..7f8b5fa 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -522,6 +522,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
 # A value of '0' disables sending any explicit LACs. Default is 0.
 bookkeeperExplicitLacIntervalInMills=0
 
+# Use older Bookkeeper wire protocol with bookie
+bookkeeperUseV2WireProtocol=true
+
 # Expose bookkeeper client managed ledger stats to prometheus. default is false
 # bookkeeperClientExposeStatsToPrometheus=false
 
diff --git a/pom.xml b/pom.xml
index 04ae478..02ba4b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,6 @@ flexible messaging model and an intuitive client API.</description>
     <zookeeper.version>3.5.7</zookeeper.version>
     <netty.version>4.1.51.Final</netty.version>
     <netty-tc-native.version>2.0.33.Final</netty-tc-native.version>
-    <storm.version>2.0.0</storm.version>
     <jetty.version>9.4.33.v20201020</jetty.version>
     <jersey.version>2.31</jersey.version>
     <athenz.version>1.8.38</athenz.version>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 496df72..ec0e3b3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -94,16 +94,17 @@ public class PulsarConnectorCache {
     private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
         throws Exception {
         ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-                .setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri()
-                        .replace(",", ";") + "/ledgers")
-                .setClientTcpNoDelay(false)
-                .setUseV2WireProtocol(true)
-                .setStickyReadsEnabled(false)
-                .setReadEntryTimeout(60)
-                .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
-                .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
-                .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
+            .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+            .setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri()
+                .replace(",", ";") + "/ledgers")
+            .setClientTcpNoDelay(false)
+            .setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
+            .setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
+            .setStickyReadsEnabled(false)
+            .setReadEntryTimeout(60)
+            .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
+            .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
+            .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
 
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
         managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 72bbb7b..781c39b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -72,6 +72,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int bookkeeperThrottleValue = 0;
     private int bookkeeperNumIOThreads = 2 * Runtime.getRuntime().availableProcessors();
     private int bookkeeperNumWorkerThreads = Runtime.getRuntime().availableProcessors();
+    private boolean bookkeeperUseV2Protocol = true;
+    private int bookkeeperExplicitInterval = 0;
 
     // --- ManagedLedger
     private long managedLedgerCacheSizeMB = 0L;
@@ -334,6 +336,26 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    public boolean getBookkeeperUseV2Protocol() {
+        return bookkeeperUseV2Protocol;
+    }
+
+    @Config("pulsar.bookkeeper-use-v2-protocol")
+    public PulsarConnectorConfig setBookkeeperUseV2Protocol(boolean bookkeeperUseV2Protocol) {
+        this.bookkeeperUseV2Protocol = bookkeeperUseV2Protocol;
+        return this;
+    }
+
+    public int getBookkeeperExplicitInterval() {
+        return bookkeeperExplicitInterval;
+    }
+
+    @Config("pulsar.bookkeeper-explicit-interval")
+    public PulsarConnectorConfig setBookkeeperExplicitInterval(int bookkeeperExplicitInterval) {
+        this.bookkeeperExplicitInterval = bookkeeperExplicitInterval;
+        return this;
+    }
+
     // --- ManagedLedger
     public long getManagedLedgerCacheSizeMB() {
         return managedLedgerCacheSizeMB;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index e29d945..ed09b31 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -27,9 +27,11 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import java.sql.Connection;
@@ -48,6 +50,11 @@ public class TestBasicPresto extends PulsarTestSuite {
 
     private static final int NUM_OF_STOCKS = 10;
 
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        return super.beforeSetupCluster(clusterName, specBuilder.queryLastMessage(true));
+    }
+
     @BeforeClass
     public void setupPresto() throws Exception {
         log.info("[setupPresto]");
@@ -125,11 +132,11 @@ public class TestBasicPresto extends PulsarTestSuite {
         assertThat(containerExecResult.getExitCode()).isEqualTo(0);
         log.info("select sql query output \n{}", containerExecResult.getStdout());
         String[] split = containerExecResult.getStdout().split("\n");
-        assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
+        assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
 
         String[] split2 = containerExecResult.getStdout().split("\n|,");
 
-        for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
+        for (int i = 0; i < NUM_OF_STOCKS; ++i) {
             assertThat(split2).contains("\"" + i + "\"");
             assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
             assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
@@ -164,7 +171,7 @@ public class TestBasicPresto extends PulsarTestSuite {
             returnedTimestamps.add(res.getTimestamp("__publish_time__"));
         }
 
-        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
+        assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2);
 
         // Try with a predicate that has a earlier time than any entry
         // Should return all rows
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index b916eea..0cf328c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -95,13 +95,15 @@ public class PulsarCluster {
 
         if (enablePrestoWorker) {
             prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
-                    .withNetwork(network)
-                    .withNetworkAliases(PrestoWorkerContainer.NAME)
-                    .withEnv("clusterName", clusterName)
-                    .withEnv("zkServers", ZKContainer.NAME)
-                    .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                    .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                    .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+                .withNetwork(network)
+                .withNetworkAliases(PrestoWorkerContainer.NAME)
+                .withEnv("clusterName", clusterName)
+                .withEnv("zkServers", ZKContainer.NAME)
+                .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
+                .withEnv("pulsar.bookkeeper-explicit-interval", "10")
+                .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
         } else {
             prestoWorkerContainer = null;
         }
@@ -150,7 +152,8 @@ public class PulsarCluster {
 
         // create brokers
         brokerContainers.putAll(
-                runNumContainers("broker", spec.numBrokers(), (name) -> new BrokerContainer(clusterName, name)
+            runNumContainers("broker", spec.numBrokers(), (name) -> {
+                    BrokerContainer brokerContainer = new BrokerContainer(clusterName, name)
                         .withNetwork(network)
                         .withNetworkAliases(name)
                         .withEnv("zkServers", ZKContainer.NAME)
@@ -160,9 +163,14 @@ public class PulsarCluster {
                         .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
                         // used in s3 tests
                         .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
-                        .withEnv("AWS_SECRET_KEY", "secretkey")
-                )
-        );
+                        .withEnv("AWS_SECRET_KEY", "secretkey");
+                    if (spec.queryLastMessage) {
+                        brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
+                        brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
+                    }
+                    return brokerContainer;
+                }
+            ));
 
         spec.classPathVolumeMounts.forEach((key, value) -> {
             zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
@@ -339,13 +347,17 @@ public class PulsarCluster {
         log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties);
         if (null == prestoWorkerContainer) {
             prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
-                    .withNetwork(network)
-                    .withNetworkAliases(PrestoWorkerContainer.NAME)
-                    .withEnv("clusterName", clusterName)
-                    .withEnv("zkServers", ZKContainer.NAME)
-                    .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                    .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                    .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+                .withNetwork(network)
+                .withNetworkAliases(PrestoWorkerContainer.NAME)
+                .withEnv("clusterName", clusterName)
+                .withEnv("zkServers", ZKContainer.NAME)
+                .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+            if (spec.queryLastMessage) {
+                prestoWorkerContainer.withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
+                    .withEnv("pulsar.bookkeeper-explicit-interval", "10");
+            }
             if (offloadDriver != null && offloadProperties != null) {
                 log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
                         offloadDriver, offloadProperties);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index e1e26ef..1498f37 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -89,6 +89,12 @@ public class PulsarClusterSpec {
     boolean enablePrestoWorker = false;
 
     /**
+     * Allow to query the last message
+     */
+    @Default
+    boolean queryLastMessage = false;
+
+    /**
      * Returns the function runtime type.
      *
      * @return the function runtime type.