You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/30 08:54:33 UTC

[pulsar] branch branch-2.7 updated (d1579b7 -> cb55dae)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from d1579b7  Issue 9082: Broker expires messages one at a time after topic unload (#9083)
     new 0159495  [Pulsar SQL] Fix OffloadPolicies json serialization error in Pulsar SQL (#9300)
     new 41efa5d  [Admin] Expose offloaded storage size to the admin stats (#9335)
     new 3b5dd97  [fix #7814] fix java function logging appender not added to java function logger (#9299)
     new cb55dae  Expose more info with unknown exception (#9323)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/pulsar/Dockerfile                           |   1 +
 ...env.py => apply-config-from-env-with-prefix.py} |  18 +-
 .../bookkeeper/mledger/ManagedLedgerException.java |   6 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   3 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   6 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |   5 +
 .../common/policies/data/OffloadPolicies.java      |  29 ++-
 .../pulsar/common/policies/data/TopicStats.java    |   5 +
 .../policies/data/PersistentTopicStatsTest.java    |   3 +
 .../functions/instance/JavaInstanceRunnable.java   |  22 +-
 .../pulsar/sql/presto/PulsarSplitManager.java      |  26 +-
 .../pulsar/sql/presto/TestPulsarSplitManager.java  |  81 ++++++
 .../scripts/run-presto-worker.sh                   |   3 +-
 .../containers/PrestoWorkerContainer.java          |  11 +-
 .../integration/containers/PulsarContainer.java    |   2 +-
 .../integration/functions/PulsarFunctionsTest.java | 152 +++++++++++
 .../functions/PulsarFunctionsTestBase.java         |   3 +
 .../tests/integration/presto/TestBasicPresto.java  | 198 ++-------------
 .../presto/TestPrestoQueryTieredStorage.java       | 279 +++++----------------
 .../integration/presto/TestPulsarSQLBase.java      | 249 ++++++++++++++++++
 ...orageTestSuite.java => PulsarSQLTestSuite.java} |  59 ++---
 .../integration/topologies/PulsarCluster.java      | 100 +++++---
 .../resources/presto-coordinator-config.properties |   4 +-
 .../presto-follow-worker-config.properties         |  20 +-
 .../integration/src/test/resources/pulsar-sql.xml  |   1 +
 26 files changed, 784 insertions(+), 504 deletions(-)
 copy docker/pulsar/scripts/{apply-config-from-env.py => apply-config-from-env-with-prefix.py} (89%)
 create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
 copy tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/{PulsarTieredStorageTestSuite.java => PulsarSQLTestSuite.java} (51%)
 copy conf/presto/config.properties => tests/integration/src/test/resources/presto-coordinator-config.properties (95%)
 copy pulsar-io/influxdb/src/test/resources/sinkConfig-v1.yaml => tests/integration/src/test/resources/presto-follow-worker-config.properties (81%)


[pulsar] 04/04: Expose more info with unknown exception (#9323)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cb55daeec3e0c376f1444a6dd8849c70c0813605
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Fri Jan 29 13:10:00 2021 +0800

    Expose more info with unknown exception (#9323)
    
    * Expose more info with unknown exception
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * polish
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * polish
    
    Signed-off-by: Renkai <ga...@gmail.com>
    
    * drop {} to enable detail exception
    
    Signed-off-by: Renkai <ga...@gmail.com>
    (cherry picked from commit 4896192be81c6b1e2c67ec4ad7cf12736746cf1b)
---
 .../java/org/apache/bookkeeper/mledger/ManagedLedgerException.java  | 6 +++++-
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 2 +-
 .../main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java   | 6 ++----
 3 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 14202cb..32f1992 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -23,7 +23,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability;
 
 @InterfaceAudience.LimitedPrivate
 @InterfaceStability.Stable
-@SuppressWarnings({"serial", "checkstyle:javadoctype"})
+@SuppressWarnings({ "serial", "checkstyle:javadoctype" })
 public class ManagedLedgerException extends Exception {
     public ManagedLedgerException(String msg) {
         super(msg);
@@ -33,6 +33,10 @@ public class ManagedLedgerException extends Exception {
         super(e);
     }
 
+    public ManagedLedgerException(String msg, Throwable e) {
+        super(msg, e);
+    }
+
     public static ManagedLedgerException getManagedLedgerException(Throwable e) {
         if (e instanceof ManagedLedgerException) {
             return (ManagedLedgerException) e;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6967a20..1f37e86 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3254,7 +3254,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return createManagedLedgerException(t.getCause());
         } else {
             log.error("Unknown exception for ManagedLedgerException.", t);
-            return new ManagedLedgerException("Unknown exception");
+            return new ManagedLedgerException("Other exception", t);
         }
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 8dd52a2..36f2704 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -19,12 +19,10 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
 import com.google.common.collect.Lists;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.List;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -110,8 +108,8 @@ class OpReadEntry implements ReadEntriesCallback {
             checkReadCompletion();
         } else {
             if (!(exception instanceof TooManyRequestsException)) {
-                log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(),
-                        cursor.getName(), readPosition, exception.getMessage());
+                log.warn("[{}][{}] read failed from ledger at position:{}", cursor.ledger.getName(),
+                        cursor.getName(), readPosition, exception);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] read throttled failed from ledger at position:{}", cursor.ledger.getName(),


[pulsar] 03/04: [fix #7814] fix java function logging appender not added to java function logger (#9299)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3b5dd9727de19d21470f000d8ab817a79d3e6602
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Jan 29 14:09:59 2021 +0800

    [fix #7814] fix java function logging appender not added to java function logger (#9299)
    
    Fixes #7814
    
    `JavaInstanceRunnable` create an instance of logger named `"function-" + instanceConfig.getFunctionDetails().getName()` and pass it to `Function Context`, and the logger can be used to send user defined content to function's log-topic if `--log-topic` defined.
    
    as issue #7814 mentioned, the logger is not working as expected since user cannot consume any self defined log content from `log-topic`.
    
    this happens in process runtime with created functions, but not noticed with other situation such as `localrun` function.
    
    Through debug to the created function, the logger in `Function Context` is different from the logger in `JavaInstanceRunnable`, such as the `contextName` as images shown below. In addition, the `LogAppender` set in `JavaInstanceRunnable` is not shown in `Function Context`'s logger as well.
    
    ![Imgur](https://i.imgur.com/39DMH6R.png)
    ^^^^ from JavaInstanceRunnable
    
    ![img](https://i.imgur.com/UDw5Lzt.png)
    ^^^^ from Function Context
    
    After some tests, I find out that when get `LoggerContext` by `LoggerContext.getContext()`, the context's logAppender can be take effect to `Function Context`, and the `Function Context`'s logger works great.
    
    Add `LogAppender` to the single context from `LoggerContext.getContext()`.
    
    (cherry picked from commit 81f1bed626fc750c68ab6740d8a80a6b2821b542)
---
 .../functions/instance/JavaInstanceRunnable.java   |  22 ++-
 .../integration/functions/PulsarFunctionsTest.java | 152 +++++++++++++++++++++
 .../functions/PulsarFunctionsTestBase.java         |   3 +
 3 files changed, 174 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fa68656..4d8c862 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -219,7 +219,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     ContextImpl setupContext() {
-        Logger instanceLog = LoggerFactory.getLogger(
+        Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
                 collectorRegistry, metricsLabels, this.componentType, this.stats, stateManager);
@@ -464,6 +464,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             log.info("Unloading JAR files for function {}", instanceConfig);
             instanceCache = null;
         }
+
+        if (logAppender != null) {
+            removeLogTopicAppender(LoggerContext.getContext());
+            removeLogTopicAppender(LoggerContext.getContext(false));
+            logAppender.stop();
+            logAppender = null;
+        }
     }
 
     synchronized public String getStatsAsString() throws IOException {
@@ -563,28 +570,37 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
                     FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
             logAppender.start();
+            setupLogTopicAppender(LoggerContext.getContext());
         }
     }
 
     private void addLogTopicHandler() {
         if (logAppender == null) return;
-        LoggerContext context = LoggerContext.getContext(false);
+        setupLogTopicAppender(LoggerContext.getContext(false));
+    }
+
+    private void setupLogTopicAppender(LoggerContext context) {
         Configuration config = context.getConfiguration();
         config.addAppender(logAppender);
         for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
             loggerConfig.addAppender(logAppender, null, null);
         }
         config.getRootLogger().addAppender(logAppender, null, null);
+        context.updateLoggers();
     }
 
     private void removeLogTopicHandler() {
         if (logAppender == null) return;
-        LoggerContext context = LoggerContext.getContext(false);
+        removeLogTopicAppender(LoggerContext.getContext(false));
+    }
+
+    private void removeLogTopicAppender(LoggerContext context) {
         Configuration config = context.getConfiguration();
         for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
             loggerConfig.removeAppender(logAppender.getName());
         }
         config.getRootLogger().removeAppender(logAppender.getName());
+        context.updateLoggers();
     }
 
     private void setupInput(ContextImpl contextImpl) throws Exception {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index d95d1d3..4e6ff98 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -1972,6 +1972,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    private static void checkPublisherCleanup(String topic) throws Exception {
+        try {
+            ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "topics",
+                    "stats",
+                    topic);
+            TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
+            assertEquals(topicStats.publishers.size(), 0);
+
+        } catch (ContainerExecException e) {
+            fail("Command should have exited with non-zero");
+        }
+    }
+
     private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
         getFunctionStatus(functionName, numMessages, checkRestarts, 1);
     }
@@ -2623,4 +2638,141 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    @Test(groups = {"java_function", "function"})
+    public void testJavaLoggingFunction() throws Exception {
+        testLoggingFunction(Runtime.JAVA);
+    }
+
+    private void testLoggingFunction(Runtime runtime) throws Exception {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
+            // python can only run on process mode
+            return;
+        }
+
+        if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.GO) {
+            // go can only run on process mode
+            return;
+        }
+
+        if (pulsarCluster == null) {
+            super.setupCluster();
+            super.setupFunctionWorkers();
+        }
+        
+        Schema<?> schema;
+        if (Runtime.JAVA == runtime) {
+            schema = Schema.STRING;
+        } else {
+            schema = Schema.BYTES;
+        }
+
+        String inputTopicName = "persistent://public/default/test-log-" + runtime + "-input-" + randomName(8);
+        String logTopicName = "test-log-" + runtime + "-log-topic-" + randomName(8);
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(logTopicName);
+        }
+
+        String functionName = "test-logging-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitJavaLoggingFunction(
+                inputTopicName, logTopicName, functionName, schema);
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
+        // publish and consume result
+        publishAndConsumeMessages(inputTopicName, logTopicName, numMessages, "-log");
+
+        // get function status
+        getFunctionStatus(functionName, numMessages, true);
+
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+
+        // make sure subscriptions are cleanup
+        checkSubscriptionsCleanup(inputTopicName);
+        checkPublisherCleanup(logTopicName);
+
+    }
+
+    private static void submitJavaLoggingFunction(String inputTopicName,
+                                                  String logTopicName,
+                                                  String functionName,
+                                                  Schema<?> schema) throws Exception {
+        CommandGenerator generator;
+        log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+        if (inputTopicName.endsWith(".*")) {
+            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
+            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+        } else {
+            log.info("----- CREATING REGULAR FUNCTION --- ");
+            generator = CommandGenerator.createDefaultGenerator(inputTopicName, LOGGING_JAVA_CLASS);
+        }
+        generator.setLogTopic(logTopicName);
+        generator.setFunctionName(functionName);
+        String command = generator.generateCreateFunctionCommand();
+
+        log.info("---------- Function command: {}", command);
+        String[] commands = {
+                "sh", "-c", command
+        };
+        ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
+                commands);
+        assertTrue(result.getStdout().contains("\"Created successfully\""));
+
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), schema);
+    }
+
+    private static void publishAndConsumeMessages(String inputTopic,
+                                                  String outputTopic,
+                                                  int numMessages,
+                                                  String messagePostfix) throws Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup Consumer<byte[]> consumer = client.newConsumer()
+                .topic(outputTopic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("test-sub")
+                .subscribe();
+
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(inputTopic)
+                .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("message-" + i);
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
+        for (int i = 0; i < numMessages; i++) {
+            expectedMessages.add("message-" + i + messagePostfix);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+            String logMsg = new String(msg.getValue(), UTF_8);
+            log.info("Received: {}", logMsg);
+            assertTrue(expectedMessages.contains(logMsg));
+            expectedMessages.remove(logMsg);
+        }
+
+        consumer.close();
+        producer.close();
+        client.close();
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 94bffb3..2538341 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -106,6 +106,9 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String EXCLAMATION_GO_FILE = "exclamationFunc";
     public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc";
 
+    public static final String LOGGING_JAVA_CLASS =
+            "org.apache.pulsar.functions.api.examples.LoggingFunction";
+
     protected static String getExclamationClass(Runtime runtime,
                                                 boolean pyZip,
                                                 boolean extraDeps) {


[pulsar] 02/04: [Admin] Expose offloaded storage size to the admin stats (#9335)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 41efa5d318b998ae4e39c2ecde3cbfd8b00f46bb
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Jan 29 03:08:57 2021 +0800

    [Admin] Expose offloaded storage size to the admin stats (#9335)
    
    *Motivation*
    
    Add offloaded storage size in the topic stats.
    
    (cherry picked from commit 2af0aedf4e11aa0b29a1706e48d1355196ab7eda)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
 .../java/org/apache/pulsar/broker/service/BrokerServiceTest.java     | 5 +++++
 .../main/java/org/apache/pulsar/common/policies/data/TopicStats.java | 5 +++++
 .../apache/pulsar/common/policies/data/PersistentTopicStatsTest.java | 3 +++
 4 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00dcd81..5ad3394 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1636,7 +1636,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.storageSize = ledger.getTotalSize();
         stats.backlogSize = ledger.getEstimatedBacklogSize();
         stats.deduplicationStatus = messageDeduplication.getStatus().toString();
-
+        stats.offloadedStorageSize = ledger.getOffloadedSize();
         return stats;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 97ddcca..5688f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -169,6 +169,9 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(subStats.msgBacklog, 0);
         assertEquals(subStats.consumers.size(), 1);
 
+        // storage stats
+        assertEquals(stats.offloadedStorageSize, 0);
+
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
@@ -206,6 +209,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
         assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
         assertNotNull(subStats.consumers.get(0).getClientVersion());
+        assertEquals(stats.offloadedStorageSize, 0);
 
         Message<byte[]> msg;
         for (int i = 0; i < 10; i++) {
@@ -218,6 +222,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         rolloverPerIntervalStats();
         stats = topicRef.getStats(false);
         subStats = stats.subscriptions.values().iterator().next();
+        assertEquals(stats.offloadedStorageSize, 0);
 
         assertEquals(subStats.msgBacklog, 0);
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index bc6716b..763f315 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -67,6 +67,9 @@ public class TopicStats {
     /** Get estimated total unconsumed or backlog size in bytes. */
     public long backlogSize;
 
+    /** Space used to store the offloaded messages for the topic/. */
+    public long offloadedStorageSize;
+
     /** List of connected publishers on this topic w/ their stats. */
     public List<PublisherStats> publishers;
 
@@ -109,6 +112,7 @@ public class TopicStats {
         this.deduplicationStatus = null;
         this.nonContiguousDeletedMessagesRanges = 0;
         this.nonContiguousDeletedMessagesRangesSerializedSize = 0;
+        this.offloadedStorageSize = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -128,6 +132,7 @@ public class TopicStats {
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
         this.backlogSize += stats.backlogSize;
+        this.offloadedStorageSize += stats.offloadedStorageSize;
         this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         if (this.publishers.size() != stats.publishers.size()) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index cdbfcd5..dcfbd20 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -37,6 +37,7 @@ public class PersistentTopicStatsTest {
         topicStats.msgThroughputOut = 1;
         topicStats.averageMsgSize = 1;
         topicStats.storageSize = 1;
+        topicStats.offloadedStorageSize = 1;
         topicStats.publishers.add(new PublisherStats());
         topicStats.subscriptions.put("test_ns", new SubscriptionStats());
         topicStats.replication.put("test_ns", new ReplicatorStats());
@@ -47,6 +48,7 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.msgRateOut, 1.0);
         assertEquals(topicStats.msgThroughputOut, 1.0);
         assertEquals(topicStats.averageMsgSize, 1.0);
+        assertEquals(topicStats.offloadedStorageSize, 1);
         assertEquals(topicStats.storageSize, 1);
         assertEquals(topicStats.publishers.size(), 1);
         assertEquals(topicStats.subscriptions.size(), 1);
@@ -61,6 +63,7 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.publishers.size(), 0);
         assertEquals(topicStats.subscriptions.size(), 0);
         assertEquals(topicStats.replication.size(), 0);
+        assertEquals(topicStats.offloadedStorageSize, 0);
     }
 
     @Test


[pulsar] 01/04: [Pulsar SQL] Fix OffloadPolicies json serialization error in Pulsar SQL (#9300)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0159495896d68e9125410ab83f7717a886242c0c
Author: ran <ga...@126.com>
AuthorDate: Thu Jan 28 18:01:31 2021 +0800

    [Pulsar SQL] Fix OffloadPolicies json serialization error in Pulsar SQL (#9300)
    
    1. The class `OffloadPolicies` couldn't be serialized to JSON bytes array in Pulsar SQL.
    2. The PulsarSplitManager can't split data in the tiered storage.
    
    serialization error log
    ```
    2021-01-23T03:20:15.403Z	ERROR	remote-task-callback-9	io.airlift.concurrent.BoundedExecutor	Task failed
    java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
    	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
    	at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
    	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.Regul [...]
    	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
    	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1277)
    	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
    	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
    	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
    	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:640)
    	at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
    	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
    	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
    	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
    	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
    	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
    	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
    	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
    	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
    	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
    	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
    	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
    	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
    	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
    	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
    	at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1509)
    	at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1215)
    	at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1109)
    	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
    	... 5 more
    ```
    
    1. Add `JsonProperty` annotation for `OffloadPolicies` config fields.
    2. Set the right LedgerOffloader for the split manager to read data from tiered storage.
    3. Add a new `apply-config-from-env-with-prefix.py` to set new configs with a specific prefix to config files.
    
    This change can be verified as follows:
    
      - *org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage*
    
    (cherry picked from commit 7dcc2ae2eb80404cbd90cc0884839683c9b748ec)
---
 docker/pulsar/Dockerfile                           |   1 +
 .../scripts/apply-config-from-env-with-prefix.py   | 100 ++++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   1 +
 .../common/policies/data/OffloadPolicies.java      |  29 ++-
 .../pulsar/sql/presto/PulsarSplitManager.java      |  26 +-
 .../pulsar/sql/presto/TestPulsarSplitManager.java  |  81 ++++++
 .../scripts/run-presto-worker.sh                   |   3 +-
 .../containers/PrestoWorkerContainer.java          |  11 +-
 .../integration/containers/PulsarContainer.java    |   2 +-
 .../tests/integration/presto/TestBasicPresto.java  | 198 ++-------------
 .../presto/TestPrestoQueryTieredStorage.java       | 279 +++++----------------
 .../integration/presto/TestPulsarSQLBase.java      | 249 ++++++++++++++++++
 .../integration/suites/PulsarSQLTestSuite.java     |  64 +++++
 .../integration/topologies/PulsarCluster.java      | 100 +++++---
 .../presto-coordinator-config.properties}          |  31 ++-
 .../presto-follow-worker-config.properties}        |  16 +-
 .../integration/src/test/resources/pulsar-sql.xml  |   1 +
 17 files changed, 735 insertions(+), 457 deletions(-)

diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile
index 715ca6f..23820fc 100644
--- a/docker/pulsar/Dockerfile
+++ b/docker/pulsar/Dockerfile
@@ -26,6 +26,7 @@ ADD ${PULSAR_TARBALL} /
 RUN mv /apache-pulsar-* /pulsar
 
 COPY scripts/apply-config-from-env.py /pulsar/bin
+COPY scripts/apply-config-from-env-with-prefix.py /pulsar/bin
 COPY scripts/gen-yml-from-env.py /pulsar/bin
 COPY scripts/generate-zookeeper-config.sh /pulsar/bin
 COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin
diff --git a/docker/pulsar/scripts/apply-config-from-env-with-prefix.py b/docker/pulsar/scripts/apply-config-from-env-with-prefix.py
new file mode 100755
index 0000000..670ba9a
--- /dev/null
+++ b/docker/pulsar/scripts/apply-config-from-env-with-prefix.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+## Edit a properties config file and replace values based on
+## the ENV variables
+## export prefix_my-key=new-value
+## ./apply-config-from-env-with-prefix prefix_ file.conf
+##
+
+import os, sys
+
+if len(sys.argv) < 3:
+    print('Usage: %s' % (sys.argv[0]))
+    sys.exit(1)
+
+# Always apply env config to env scripts as well
+prefix = sys.argv[1]
+conf_files = sys.argv[2:]
+
+
+for conf_filename in conf_files:
+    lines = []  # List of config file lines
+    keys = {} # Map a key to its line number in the file
+
+    # Load conf file
+    for line in open(conf_filename):
+        lines.append(line)
+        line = line.strip()
+        if not line or line.startswith('#'):
+            continue
+
+        try:
+            k,v = line.split('=', 1)
+            keys[k] = len(lines) - 1
+        except:
+            print("[%s] skip Processing %s" % (conf_filename, line))
+
+    # Update values from Env
+    for k in sorted(os.environ.keys()):
+        v = os.environ[k].strip()
+
+        # Hide the value in logs if is password.
+        if "password" in k:
+            displayValue = "********"
+        else:
+            displayValue = v
+
+        if k.startswith(prefix):
+            k = k[len(prefix):]
+        if k in keys:
+            print('[%s] Applying config %s = %s' % (conf_filename, k, displayValue))
+            idx = keys[k]
+            lines[idx] = '%s=%s\n' % (k, v)
+
+
+    # Add new keys from Env
+    for k in sorted(os.environ.keys()):
+        v = os.environ[k]
+        if not k.startswith(prefix):
+            continue
+
+        # Hide the value in logs if is password.
+        if "password" in k:
+            displayValue = "********"
+        else:
+            displayValue = v
+
+        k = k[len(prefix):]
+        if k not in keys:
+            print('[%s] Adding config %s = %s' % (conf_filename, k, displayValue))
+            lines.append('%s=%s\n' % (k, v))
+        else:
+            print('[%s] Updating config %s = %s' % (conf_filename, k, displayValue))
+            lines[keys[k]] = '%s=%s\n' % (k, v)
+
+
+    # Store back the updated config in the same file
+    f = open(conf_filename, 'w')
+    for line in lines:
+        f.write(line)
+    f.close()
+
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0434fab..6967a20 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3253,6 +3253,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 && !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
             return createManagedLedgerException(t.getCause());
         } else {
+            log.error("Unknown exception for ManagedLedgerException.", t);
             return new ManagedLedgerException("Unknown exception");
         }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 91cd2aa..f07a424 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.policies.data;
 
 import static org.apache.pulsar.common.util.FieldParser.value;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.lang.annotation.ElementType;
@@ -72,65 +73,89 @@ public class OffloadPolicies implements Serializable {
 
     // common config
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String managedLedgerOffloadDriver = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
-
     // s3 config, set by service configuration or cli
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadRegion = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadBucket = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadServiceEndpoint = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer s3ManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer s3ManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
     // s3 config, set by service configuration
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadRole = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload";
 
     // gcs config, set by service configuration or cli
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String gcsManagedLedgerOffloadRegion = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String gcsManagedLedgerOffloadBucket = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
     // gcs config, set by service configuration
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
 
     // file system config, set by service configuration
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String fileSystemProfilePath = null;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String fileSystemURI = null;
 
     // --------- new offload configurations ---------
     // they are universal configurations and could be used to `aws-s3`, `google-cloud-storage` or `azureblob`.
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String managedLedgerOffloadBucket;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String managedLedgerOffloadRegion;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private String managedLedgerOffloadServiceEndpoint;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadMaxBlockSizeInBytes;
     @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Integer managedLedgerOffloadReadBufferSizeInBytes;
 
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
@@ -511,7 +536,7 @@ public class OffloadPolicies implements Serializable {
         if (field.getName().equals("managedLedgerOffloadThresholdInBytes")) {
             object = properties.getProperty("managedLedgerOffloadThresholdInBytes",
                     properties.getProperty(OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE));
-        } else if (field.getName().equals("")) {
+        } else if (field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) {
             object = properties.getProperty("managedLedgerOffloadDeletionLagInMillis",
                     properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE));
         } else {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index a22786a..c59553f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -128,6 +128,11 @@ public class PulsarSplitManager implements ConnectorSplitManager {
         try {
             OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces()
                                                 .getOffloadPolicies(topicName.getNamespace());
+            if (offloadPolicies != null) {
+                offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory());
+                offloadPolicies.setManagedLedgerOffloadMaxThreads(
+                        pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads());
+            }
             if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
                 splits = getSplitsNonPartitionedTopic(
                         numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies);
@@ -160,8 +165,10 @@ public class PulsarSplitManager implements ConnectorSplitManager {
 
         int splitRemainder = actualNumSplits % predicatedPartitions.size();
 
-        ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
-                .getManagedLedgerFactory();
+        PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
+        ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory();
+        ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig(
+                topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig);
 
         List<PulsarSplit> splits = new LinkedList<>();
         for (int i = 0; i < predicatedPartitions.size(); i++) {
@@ -170,6 +177,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                 getSplitsForTopic(
                     topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
                     managedLedgerFactory,
+                    managedLedgerConfig,
                     splitsForThisPartition,
                     tableHandle,
                     schemaInfo,
@@ -231,12 +239,15 @@ public class PulsarSplitManager implements ConnectorSplitManager {
     Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
             PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain,
              OffloadPolicies offloadPolicies) throws Exception {
-        ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
-                .getManagedLedgerFactory();
+        PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
+        ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory();
+        ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig(
+                topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig);
 
         return getSplitsForTopic(
                 topicName.getPersistenceNamingEncoding(),
                 managedLedgerFactory,
+                managedLedgerConfig,
                 numSplits,
                 tableHandle,
                 schemaInfo,
@@ -248,6 +259,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
     @VisibleForTesting
     Collection<PulsarSplit> getSplitsForTopic(String topicNamePersistenceEncoding,
                                               ManagedLedgerFactory managedLedgerFactory,
+                                              ManagedLedgerConfig managedLedgerConfig,
                                               int numSplits,
                                               PulsarTableHandle tableHandle,
                                               SchemaInfo schemaInfo, String tableName,
@@ -259,7 +271,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
         try {
             readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                     topicNamePersistenceEncoding,
-                    PositionImpl.earliest, new ManagedLedgerConfig());
+                    PositionImpl.earliest, managedLedgerConfig);
 
             long numEntries = readOnlyCursor.getNumberOfEntries();
             if (numEntries <= 0) {
@@ -270,6 +282,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                     this.connectorId,
                     tupleDomain,
                     managedLedgerFactory,
+                    managedLedgerConfig,
                     topicNamePersistenceEncoding,
                     numEntries);
 
@@ -341,6 +354,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
         public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
                                                                      TupleDomain<ColumnHandle> tupleDomain,
                                                                      ManagedLedgerFactory managedLedgerFactory,
+                                                                     ManagedLedgerConfig managedLedgerConfig,
                                                                      String topicNamePersistenceEncoding,
                                                                      long totalNumEntries) throws
                 ManagedLedgerException, InterruptedException {
@@ -349,7 +363,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
             try {
                 readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(
                         topicNamePersistenceEncoding,
-                        PositionImpl.earliest, new ManagedLedgerConfig());
+                        PositionImpl.earliest, managedLedgerConfig);
 
                 if (tupleDomain.getDomains().isPresent()) {
                     Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 3d185c9..33a32a3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.airlift.json.JsonCodec;
 import io.airlift.log.Logger;
 import io.prestosql.spi.connector.ColumnHandle;
 import io.prestosql.spi.connector.ConnectorSession;
@@ -28,12 +31,17 @@ import io.prestosql.spi.predicate.Range;
 import io.prestosql.spi.predicate.TupleDomain;
 import io.prestosql.spi.predicate.ValueSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -53,6 +61,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 public class TestPulsarSplitManager extends TestPulsarConnector {
 
@@ -401,4 +410,76 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
             pulsarTableLayoutHandle, null);
         assertNotNull(connectorSplitSource);
     }
+
+    @Test
+    public void pulsarSplitJsonCodecTest() throws JsonProcessingException, UnsupportedEncodingException {
+        OffloadPolicies offloadPolicies = OffloadPolicies.create(
+                "aws-s3",
+                "test-region",
+                "test-bucket",
+                "test-endpoint",
+                "test-credential-id",
+                "test-credential-secret",
+                5000,
+                2000,
+                1000L,
+                5000L
+        );
+
+        SchemaInfo schemaInfo = JSONSchema.of(Foo.class).getSchemaInfo();
+        final String schema = new String(schemaInfo.getSchema(),  "ISO8859-1");
+        final String originSchemaName = schemaInfo.getName();
+        final String schemaName = schemaInfo.getName();
+        final String schemaInfoProperties = new ObjectMapper().writeValueAsString(schemaInfo.getProperties());
+        final SchemaType schemaType = schemaInfo.getType();
+
+        final long splitId = 1;
+        final String connectorId = "connectorId";
+        final String tableName = "tableName";
+        final long splitSize = 5;
+        final long startPositionEntryId = 22;
+        final long endPositionEntryId = 33;
+        final long startPositionLedgerId = 10;
+        final long endPositionLedgerId = 21;
+        final TupleDomain<ColumnHandle> tupleDomain = TupleDomain.all();
+
+        byte[] pulsarSplitData;
+        JsonCodec<PulsarSplit> jsonCodec = JsonCodec.jsonCodec(PulsarSplit.class);
+        try {
+            PulsarSplit pulsarSplit = new PulsarSplit(
+                    splitId, connectorId, schemaName, originSchemaName, tableName, splitSize, schema,
+                    schemaType, startPositionEntryId, endPositionEntryId, startPositionLedgerId,
+                    endPositionLedgerId, tupleDomain, schemaInfoProperties, offloadPolicies);
+            pulsarSplitData = jsonCodec.toJsonBytes(pulsarSplit);
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("Failed to serialize the PulsarSplit.", e);
+            fail("Failed to serialize the PulsarSplit.");
+            return;
+        }
+
+        try {
+            PulsarSplit pulsarSplit = jsonCodec.fromJson(pulsarSplitData);
+            Assert.assertEquals(pulsarSplit.getSchema(), schema);
+            Assert.assertEquals(pulsarSplit.getOriginSchemaName(), originSchemaName);
+            Assert.assertEquals(pulsarSplit.getSchemaName(), schemaName);
+            Assert.assertEquals(pulsarSplit.getSchemaInfoProperties(), schemaInfoProperties);
+            Assert.assertEquals(pulsarSplit.getSchemaType(), schemaType);
+            Assert.assertEquals(pulsarSplit.getSplitId(), splitId);
+            Assert.assertEquals(pulsarSplit.getConnectorId(), connectorId);
+            Assert.assertEquals(pulsarSplit.getTableName(), tableName);
+            Assert.assertEquals(pulsarSplit.getSplitSize(), splitSize);
+            Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), startPositionEntryId);
+            Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), endPositionEntryId);
+            Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), startPositionLedgerId);
+            Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), endPositionLedgerId);
+            Assert.assertEquals(pulsarSplit.getTupleDomain(), tupleDomain);
+            Assert.assertEquals(pulsarSplit.getOffloadPolicies(), offloadPolicies);
+        } catch (Exception e) {
+            log.error("Failed to deserialize the PulsarSplit.", e);
+            fail("Failed to deserialize the PulsarSplit.");
+        }
+
+    }
+
 }
diff --git a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
index 1de9c1a..0d5166d 100755
--- a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
@@ -19,8 +19,7 @@
 #
 
 bin/set_python_version.sh
-
-bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \
+bin/apply-config-from-env-with-prefix.py SQL_PREFIX_ conf/presto/catalog/pulsar.properties && \
     bin/apply-config-from-env.py conf/pulsar_env.sh
 
 if [ -z "$NO_AUTOSTART" ]; then
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
index 71ebb48..9462363 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -36,11 +36,20 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
                 "bin/run-presto-worker.sh",
                 -1,
                 PRESTO_HTTP_PORT,
-                "/v1/node");
+                "/v1/info/state");
 
     }
 
     @Override
+    protected void afterStart() {
+        this.tailContainerLog();
+        DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+                "tail", "-f", "/var/log/pulsar/presto_worker.log");
+        DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(),
+                "tail", "-f", "/pulsar/lib/presto/var/log/server.log");
+    }
+
+    @Override
     protected void beforeStop() {
         super.beforeStop();
         if (null != containerId) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 54c21df..f13875a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -149,7 +149,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
         beforeStart();
         super.start();
         afterStart();
-        log.info("Start pulsar service {} at container {}", serviceName, containerName);
+        log.info("[{}] Start pulsar service {} at container {}", getContainerName(), serviceName, getContainerId());
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index bb54c93..c40002e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -23,96 +23,51 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.awaitility.Awaitility;
+import org.apache.pulsar.common.naming.TopicName;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
 @Slf4j
-public class TestBasicPresto extends PulsarTestSuite {
+public class TestBasicPresto extends TestPulsarSQLBase {
 
     private static final int NUM_OF_STOCKS = 10;
 
-    @Override
-    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
-        return super.beforeSetupCluster(clusterName, specBuilder.queryLastMessage(true));
-    }
-
     @BeforeClass
     public void setupPresto() throws Exception {
-        log.info("[setupPresto]");
+        log.info("[TestBasicPresto] setupPresto...");
         pulsarCluster.startPrestoWorker();
     }
 
     @AfterClass
     public void teardownPresto() {
-        log.info("tearing down...");
+        log.info("[TestBasicPresto] tearing down...");
         pulsarCluster.stopPrestoWorker();
     }
 
     @Test
     public void testSimpleSQLQueryBatched() throws Exception {
-        testSimpleSQLQuery(true);
+        TopicName topicName = TopicName.get("public/default/stocks_batched_" + randomName(5));
+        pulsarSQLBasicTest(topicName, true, false);
     }
 
     @Test
     public void testSimpleSQLQueryNonBatched() throws Exception {
-        testSimpleSQLQuery(false);
+        TopicName topicName = TopicName.get("public/default/stocks_nonbatched_" + randomName(5));
+        pulsarSQLBasicTest(topicName, false, false);
     }
-    
-    public void testSimpleSQLQuery(boolean isBatched) throws Exception {
-
-        // wait until presto worker started
-        ContainerExecResult result;
-        do {
-            try {
-                result = execQuery("show catalogs;");
-                assertThat(result.getExitCode()).isEqualTo(0);
-                assertThat(result.getStdout()).contains("pulsar", "system");
-                break;
-            } catch (ContainerExecException cee) {
-                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
-                    Thread.sleep(10000);
-                } else {
-                    throw cee;
-                }
-            }
-        } while (true);
 
+    @Override
+    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
         @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
-                                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                                    .build();
-
-        String stocksTopic;
-        if (isBatched) {
-            stocksTopic = "stocks_batched";
-        } else {
-            stocksTopic = "stocks_nonbatched";
-        }
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
 
         @Cleanup
         Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
-                .topic(stocksTopic)
-                .enableBatching(isBatched)
+                .topic(topicName.toString())
+                .enableBatching(isBatch)
                 .create();
 
         for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
@@ -120,129 +75,6 @@ public class TestBasicPresto extends PulsarTestSuite {
             producer.send(stock);
         }
         producer.flush();
-
-        result = execQuery("show schemas in pulsar;");
-        assertThat(result.getExitCode()).isEqualTo(0);
-        assertThat(result.getStdout()).contains("public/default");
-
-        pulsarCluster.getBroker(0)
-            .execCmd(
-                "/bin/bash",
-                "-c", "bin/pulsar-admin namespaces unload public/default");
-
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
-            () -> {
-                ContainerExecResult r = execQuery("show tables in pulsar.\"public/default\";");
-                assertThat(r.getExitCode()).isEqualTo(0);
-                assertThat(r.getStdout()).contains("stocks");
-            }
-        );
-
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
-            () -> {
-                ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
-                assertThat(containerExecResult.getExitCode()).isEqualTo(0);
-                log.info("select sql query output \n{}", containerExecResult.getStdout());
-                String[] split = containerExecResult.getStdout().split("\n");
-                assertThat(split.length).isEqualTo(NUM_OF_STOCKS);
-                String[] split2 = containerExecResult.getStdout().split("\n|,");
-                for (int i = 0; i < NUM_OF_STOCKS; ++i) {
-                    assertThat(split2).contains("\"" + i + "\"");
-                    assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
-                    assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
-                }
-            }
-        );
-
-        // test predicate pushdown
-
-        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
-        Connection connection = DriverManager.getConnection(url, "test", null);
-
-        String query = String.format("select * from pulsar" +
-                ".\"public/default\".%s order by __publish_time__", stocksTopic);
-        log.info("Executing query: {}", query);
-        ResultSet res = connection.createStatement().executeQuery(query);
-
-        List<Timestamp> timestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            timestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
-
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        List<Timestamp> returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2);
-
-        // Try with a predicate that has a earlier time than any entry
-        // Should return all rows
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
-
-        // Try with a predicate that has a latter time than any entry
-        // Should return no rows
-
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size()).isEqualTo(0);
-    }
-
-    @AfterSuite
-    @Override
-    public void tearDownCluster() {
-        super.tearDownCluster();
-    }
-
-    public static ContainerExecResult execQuery(final String query) throws Exception {
-        ContainerExecResult containerExecResult;
-
-        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
-                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
-
-        return containerExecResult;
-
+        return NUM_OF_STOCKS;
     }
-
-    private static void printCurrent(ResultSet rs) throws SQLException {
-        ResultSetMetaData rsmd = rs.getMetaData();
-        int columnsNumber = rsmd.getColumnCount();
-        for (int i = 1; i <= columnsNumber; i++) {
-            if (i > 1) System.out.print(",  ");
-            String columnValue = rs.getString(i);
-            System.out.print(columnValue + " " + rsmd.getColumnName(i));
-        }
-        System.out.println("");
-
-    }
-
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
index 42f1ea9..cd0d348 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java
@@ -18,61 +18,53 @@
  */
 package org.apache.pulsar.tests.integration.presto;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.containers.S3Container;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
 
+/**
+ * Test presto query from tiered storage.
+ */
 @Slf4j
-public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
+public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase {
 
-    private final static int ENTRIES_PER_LEDGER = 1024;
-    private final static String OFFLOAD_DRIVER = "s3";
-    private final static String BUCKET = "pulsar-integtest";
-    private final static String ENDPOINT = "http://" + S3Container.NAME + ":9090";
+    private final String TENANT = "presto";
+    private final String NAMESPACE = "ts";
 
     private S3Container s3Container;
 
-    @Override
-    protected void beforeStartCluster() throws Exception {
-        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
-            getEnv().forEach(brokerContainer::withEnv);
-        }
-    }
-
     @BeforeClass
-    public void setupPresto() throws Exception {
+    public void setupExtraContainers() throws Exception {
+        log.info("[TestPrestoQueryTieredStorage] setupExtraContainers...");
+        pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", TENANT);
+
+        pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(),
+                NamespaceName.get(TENANT, NAMESPACE).toString());
+
         s3Container = new S3Container(
                 pulsarCluster.getClusterName(),
                 S3Container.NAME)
@@ -80,11 +72,12 @@ public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
                 .withNetworkAliases(S3Container.NAME);
         s3Container.start();
 
-        log.info("[setupPresto] prestoWorker: " + pulsarCluster.getPrestoWorkerContainer());
-        pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, getOffloadProperties(BUCKET, null, ENDPOINT));
+        String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT);
+        pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties);
+        pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties);
     }
 
-    public String getOffloadProperties(String bucket, String region, String endpoint) {
+    private String getOffloadProperties(String bucket, String region, String endpoint) {
         checkNotNull(bucket);
         StringBuilder sb = new StringBuilder();
         sb.append("{");
@@ -99,10 +92,9 @@ public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
         return sb.toString();
     }
 
-
     @AfterClass
     public void teardownPresto() {
-        log.info("tearing down...");
+        log.info("[TestPrestoQueryTieredStorage] tearing down...");
         if (null != s3Container) {
             s3Container.stop();
         }
@@ -110,213 +102,91 @@ public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
         pulsarCluster.stopPrestoWorker();
     }
 
-    // Flaky Test: https://github.com/apache/pulsar/issues/7750
-    // @Test
+    @Test
     public void testQueryTieredStorage1() throws Exception {
-        testSimpleSQLQuery(false);
+        TopicName topicName = TopicName.get(
+                TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_nons_" + randomName(5));
+        pulsarSQLBasicTest(topicName, false, false);
     }
 
-    // Flaky Test: https://github.com/apache/pulsar/issues/7750
-    // @Test
+    @Test
     public void testQueryTieredStorage2() throws Exception {
-        testSimpleSQLQuery(true);
+        TopicName topicName = TopicName.get(
+                TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_ns_" + randomName(5));
+        pulsarSQLBasicTest(topicName, false, true);
     }
 
-    public void testSimpleSQLQuery(boolean isNamespaceOffload) throws Exception {
-
-        // wait until presto worker started
-        ContainerExecResult result;
-        do {
-            try {
-                result = execQuery("show catalogs;");
-                assertThat(result.getExitCode()).isEqualTo(0);
-                assertThat(result.getStdout()).contains("pulsar", "system");
-                break;
-            } catch (ContainerExecException cee) {
-                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
-                    Thread.sleep(10000);
-                } else {
-                    throw cee;
-                }
-            }
-        } while (true);
-
+    @Override
+    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
         @Cleanup
         PulsarClient pulsarClient = PulsarClient.builder()
-                                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                                    .build();
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
 
-        String stocksTopic = "stocks-" + randomName(5);
+        @Cleanup
+        Consumer<Stock> consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class))
+                .topic(topicName.toString())
+                .subscriptionName("test")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
 
         @Cleanup
         Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
-                .topic(stocksTopic)
+                .topic(topicName.toString())
                 .create();
 
         long firstLedgerId = -1;
-        long currentLedgerId = -1;
         int sendMessageCnt = 0;
-        while (currentLedgerId <= firstLedgerId) {
-            sendMessageCnt ++;
-            final Stock stock = new Stock(sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
+        while (true) {
+            Stock stock = new Stock(
+                    sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10);
             MessageIdImpl messageId = (MessageIdImpl) producer.send(stock);
+            sendMessageCnt ++;
             if (firstLedgerId == -1) {
                 firstLedgerId = messageId.getLedgerId();
             }
-            currentLedgerId = messageId.getLedgerId();
-            log.info("firstLedgerId: {}, currentLedgerId: {}", firstLedgerId, currentLedgerId);
+            if (messageId.getLedgerId() > firstLedgerId) {
+                log.info("ledger rollover firstLedgerId: {}, currentLedgerId: {}",
+                        firstLedgerId, messageId.getLedgerId());
+                break;
+            }
             Thread.sleep(100);
         }
-        producer.flush();
-
-        offloadAndDeleteFromBK(isNamespaceOffload, stocksTopic);
-
-        // check schema
-        result = execQuery("show schemas in pulsar;");
-        assertThat(result.getExitCode()).isEqualTo(0);
-        assertThat(result.getStdout()).contains("public/default");
-
-        // check table
-        result = execQuery("show tables in pulsar.\"public/default\";");
-        assertThat(result.getExitCode()).isEqualTo(0);
-        assertThat(result.getStdout()).contains(stocksTopic);
-
-        // check query
-        ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
-        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
-        log.info("select sql query output \n{}", containerExecResult.getStdout());
-        String[] split = containerExecResult.getStdout().split("\n");
-        assertThat(split.length).isGreaterThan(sendMessageCnt - 2);
-
-        String[] split2 = containerExecResult.getStdout().split("\n|,");
-
-        for (int i = 0; i < sendMessageCnt - 2; ++i) {
-            assertThat(split2).contains("\"" + i + "\"");
-            assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
-            assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
-        }
-
-        // test predicate pushdown
-
-        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
-        Connection connection = DriverManager.getConnection(url, "test", null);
 
-        String query = String.format("select * from pulsar" +
-                ".\"public/default\".%s order by __publish_time__", stocksTopic);
-        log.info("Executing query: {}", query);
-        ResultSet res = connection.createStatement().executeQuery(query);
-
-        List<Timestamp> timestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            timestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(timestamps.size()).isGreaterThan(sendMessageCnt - 2);
-
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        List<Timestamp> returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
-
-        // Try with a predicate that has a earlier time than any entry
-        // Should return all rows
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
-
-        // Try with a predicate that has a latter time than any entry
-        // Should return no rows
-
-        query = String.format("select * from pulsar" +
-                ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
-        log.info("Executing query: {}", query);
-        res = connection.createStatement().executeQuery(query);
-
-        returnedTimestamps = new LinkedList<>();
-        while (res.next()) {
-            printCurrent(res);
-            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
-        }
-
-        assertThat(returnedTimestamps.size()).isEqualTo(0);
+        offloadAndDeleteFromBK(useNsOffloadPolices, topicName);
+        return sendMessageCnt;
     }
 
-    @AfterSuite
-    @Override
-    public void tearDownCluster() {
-        super.tearDownCluster();
-    }
-
-    public static ContainerExecResult execQuery(final String query) throws Exception {
-        ContainerExecResult containerExecResult;
-
-        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
-                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
-
-        return containerExecResult;
-
-    }
-
-    private static void printCurrent(ResultSet rs) throws SQLException {
-        ResultSetMetaData rsmd = rs.getMetaData();
-        int columnsNumber = rsmd.getColumnCount();
-        for (int i = 1; i <= columnsNumber; i++) {
-            if (i > 1) System.out.print(",  ");
-            String columnValue = rs.getString(i);
-            System.out.print(columnValue + " " + rsmd.getColumnName(i));
-        }
-        System.out.println("");
-
-    }
-
-    private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTopic) {
+    private void offloadAndDeleteFromBK(boolean useNsOffloadPolices, TopicName topicName) {
         String adminUrl = pulsarCluster.getHttpServiceUrl();
         try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
             // read managed ledger info, check ledgers exist
-            long firstLedger = admin.topics().getInternalStats(stocksTopic).ledgers.get(0).ledgerId;
+            long firstLedger = admin.topics().getInternalStats(topicName.toString()).ledgers.get(0).ledgerId;
 
             String output = "";
 
-            if (isNamespaceOffload) {
+            if (useNsOffloadPolices) {
                 pulsarCluster.runAdminCommandOnAnyBroker(
                         "namespaces", "set-offload-policies",
                         "--bucket", "pulsar-integtest",
-                        "--driver", "s3",
+                        "--driver", OFFLOAD_DRIVER,
                         "--endpoint", "http://" + S3Container.NAME + ":9090",
                         "--offloadAfterElapsed", "1000",
-                        "public/default");
+                        topicName.getNamespace());
 
                 output = pulsarCluster.runAdminCommandOnAnyBroker(
-                        "namespaces", "get-offload-policies").getStdout();
+                        "namespaces", "get-offload-policies", topicName.getNamespace()).getStdout();
                 Assert.assertTrue(output.contains("pulsar-integtest"));
-                Assert.assertTrue(output.contains("s3"));
+                Assert.assertTrue(output.contains(OFFLOAD_DRIVER));
             }
 
             // offload with a low threshold
             output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                    "offload", "--size-threshold", "1M", stocksTopic).getStdout();
+                    "offload", "--size-threshold", "0", topicName.toString()).getStdout();
             Assert.assertTrue(output.contains("Offload triggered"));
 
             output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                    "offload-status", "-w", stocksTopic).getStdout();
+                    "offload-status", "-w", topicName.toString()).getStdout();
             Assert.assertTrue(output.contains("Offload was a success"));
 
             // delete the first ledger, so that we cannot possibly read from it
@@ -330,21 +200,10 @@ public class TestPrestoQueryTieredStorage extends PulsarTestSuite {
             }
 
             // Unload topic to clear all caches, open handles, etc
-            admin.topics().unload(stocksTopic);
+            admin.topics().unload(topicName.toString());
         } catch (Exception e) {
             Assert.fail("Failed to deleteOffloadedDataFromBK.");
         }
     }
 
-    protected Map<String, String> getEnv() {
-        Map<String, String> result = new HashMap<>();
-        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
-        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
-        result.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER);
-        result.put("s3ManagedLedgerOffloadBucket", BUCKET);
-        result.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT);
-
-        return result;
-    }
-
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
new file mode 100644
index 0000000..b4caa9f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.base.Stopwatch;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.awaitility.Awaitility;
+import org.testcontainers.shaded.okhttp3.OkHttpClient;
+import org.testcontainers.shaded.okhttp3.Request;
+import org.testcontainers.shaded.okhttp3.Response;
+import org.testng.Assert;
+
+@Slf4j
+public class TestPulsarSQLBase extends PulsarSQLTestSuite {
+
+    protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
+        waitPulsarSQLReady();
+
+        log.info("start prepare data for query. topic: {}", topic);
+        int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices);
+        log.info("finish prepare data for query. topic: {}, messageCnt: {}", topic, messageCnt);
+
+        validateMetadata(topic);
+
+        validateData(topic, messageCnt);
+    }
+
+    private void waitPulsarSQLReady() throws Exception {
+        // wait until presto worker started
+        ContainerExecResult result;
+        do {
+            try {
+                result = execQuery("show catalogs;");
+                assertThat(result.getExitCode()).isEqualTo(0);
+                assertThat(result.getStdout()).contains("pulsar", "system");
+                break;
+            } catch (ContainerExecException cee) {
+                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
+                    Thread.sleep(10000);
+                } else {
+                    throw cee;
+                }
+            }
+        } while (true);
+
+        // check presto follow workers start finish.
+        if (pulsarCluster.getSqlFollowWorkerContainers() != null
+                && pulsarCluster.getSqlFollowWorkerContainers().size() > 0) {
+            OkHttpClient okHttpClient = new OkHttpClient();
+            Request request = new Request.Builder()
+                    .url("http://" + pulsarCluster.getPrestoWorkerContainer().getUrl() + "/v1/node")
+                    .build();
+            do {
+                try (Response response = okHttpClient.newCall(request).execute()) {
+                    Assert.assertNotNull(response.body());
+                    String nodeJsonStr = response.body().string();
+                    Assert.assertTrue(nodeJsonStr.length() > 0);
+                    log.info("presto node info: {}", nodeJsonStr);
+                    if (nodeJsonStr.contains("uri")) {
+                        log.info("presto node exist.");
+                        break;
+                    }
+                    Thread.sleep(1000);
+                }
+            } while (true);
+        }
+    }
+
+    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception {
+        throw new Exception("Unsupported operation prepareData.");
+    }
+
+    private void validateMetadata(TopicName topicName) throws Exception {
+        ContainerExecResult result = execQuery("show schemas in pulsar;");
+        assertThat(result.getExitCode()).isEqualTo(0);
+        assertThat(result.getStdout()).contains(topicName.getNamespace());
+
+        pulsarCluster.getBroker(0)
+                .execCmd(
+                        "/bin/bash",
+                        "-c", "bin/pulsar-admin namespaces unload " + topicName.getNamespace());
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+                () -> {
+                    ContainerExecResult r = execQuery(
+                            String.format("show tables in pulsar.\"%s\";", topicName.getNamespace()));
+                    assertThat(r.getExitCode()).isEqualTo(0);
+                    assertThat(r.getStdout()).contains(topicName.getLocalName());
+                }
+        );
+    }
+
+    private void validateData(TopicName topicName, int messageNum) throws Exception {
+        String namespace = topicName.getNamespace();
+        String topic = topicName.getLocalName();
+
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
+                () -> {
+                    ContainerExecResult containerExecResult = execQuery(
+                            String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic));
+                    assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+                    log.info("select sql query output \n{}", containerExecResult.getStdout());
+                    String[] split = containerExecResult.getStdout().split("\n");
+                    assertThat(split.length).isEqualTo(messageNum);
+                    String[] split2 = containerExecResult.getStdout().split("\n|,");
+                    for (int i = 0; i < messageNum; ++i) {
+                        assertThat(split2).contains("\"" + i + "\"");
+                        assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
+                        assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
+                    }
+                }
+        );
+
+        // test predicate pushdown
+        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
+        Connection connection = DriverManager.getConnection(url, "test", null);
+
+        String query = String.format("select * from pulsar" +
+                ".\"%s\".\"%s\" order by __publish_time__", namespace, topic);
+        log.info("Executing query: {}", query);
+        ResultSet res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> timestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            timestamps.add(res.getTimestamp("__publish_time__"));
+        }
+        log.info("Executing query: result for topic {} timestamps size {}", topic, timestamps.size());
+
+        assertThat(timestamps.size()).isGreaterThan(messageNum - 2);
+
+        query = String.format("select * from pulsar" +
+                ".\"%s\".\"%s\" where __publish_time__ > timestamp '%s' order by __publish_time__",
+                namespace, topic, timestamps.get(timestamps.size() / 2));
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        List<Timestamp> returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
+        if (timestamps.size() % 2 == 0) {
+            // for example: total size 10, the right receive number is 4, so 4 + 1 == 10 / 2
+            assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2);
+        } else {
+            // for example: total size 101, the right receive number is 50, so 50 == (101 - 1) / 2
+            assertThat(returnedTimestamps.size()).isEqualTo((timestamps.size() - 1) / 2);
+        }
+
+        // Try with a predicate that has a earlier time than any entry
+        // Should return all rows
+        query = String.format("select * from pulsar.\"%s\".\"%s\" where "
+                + "__publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 0);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
+        assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
+
+        // Try with a predicate that has a latter time than any entry
+        // Should return no rows
+
+        query = String.format("select * from pulsar.\"%s\".\"%s\" where "
+                + "__publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 99999999999L);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+
+        returnedTimestamps = new LinkedList<>();
+        while (res.next()) {
+            printCurrent(res);
+            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
+        }
+
+        log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
+        assertThat(returnedTimestamps.size()).isEqualTo(0);
+    }
+
+    public static ContainerExecResult execQuery(final String query) throws Exception {
+        ContainerExecResult containerExecResult;
+
+        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
+                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+
+        Stopwatch sw = Stopwatch.createStarted();
+        while (containerExecResult.getExitCode() != 0 && sw.elapsed(TimeUnit.SECONDS) < 120) {
+            TimeUnit.MILLISECONDS.sleep(500);
+            containerExecResult = pulsarCluster.getPrestoWorkerContainer()
+                    .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+        }
+
+        return containerExecResult;
+
+    }
+
+    private static void printCurrent(ResultSet rs) throws SQLException {
+        ResultSetMetaData rsmd = rs.getMetaData();
+        int columnsNumber = rsmd.getColumnCount();
+        for (int i = 1; i <= columnsNumber; i++) {
+            if (i > 1) System.out.print(",  ");
+            String columnValue = rs.getString(i);
+            System.out.print(columnValue + " " + rsmd.getColumnName(i));
+        }
+        System.out.println("");
+
+    }
+
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
new file mode 100644
index 0000000..5863ae6
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.suites;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+
+@Slf4j
+public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
+
+    public final static int ENTRIES_PER_LEDGER = 100;
+    public final static String OFFLOAD_DRIVER = "aws-s3";
+    public final static String BUCKET = "pulsar-integtest";
+    public final static String ENDPOINT = "http://" + S3Container.NAME + ":9090";
+
+    @Override
+    public String getTestName() {
+        return "pulsar-sql-test-suite";
+    }
+
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        specBuilder.queryLastMessage(true);
+        specBuilder.clusterName("pulsar-sql-test");
+        specBuilder.numBrokers(1);
+        return super.beforeSetupCluster(clusterName, specBuilder);
+    }
+
+    @Override
+    protected void beforeStartCluster() throws Exception {
+        Map<String, String> envMap = new HashMap<>();
+        envMap.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        envMap.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        envMap.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER);
+        envMap.put("s3ManagedLedgerOffloadBucket", BUCKET);
+        envMap.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT);
+
+        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+            brokerContainer.withEnv(envMap);
+        }
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index c75bd59e..7e7c642 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -83,6 +83,8 @@ public class PulsarCluster {
     private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
     private PrestoWorkerContainer prestoWorkerContainer;
+    @Getter
+    private Map<String, PrestoWorkerContainer> sqlFollowWorkerContainers;
     private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
     private final boolean enablePrestoWorker;
 
@@ -93,17 +95,10 @@ public class PulsarCluster {
         this.network = Network.newNetwork();
         this.enablePrestoWorker = spec.enablePrestoWorker();
 
+        this.sqlFollowWorkerContainers = Maps.newTreeMap();
         if (enablePrestoWorker) {
-            prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
-                .withNetwork(network)
-                .withNetworkAliases(PrestoWorkerContainer.NAME)
-                .withEnv("clusterName", clusterName)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                .withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
-                .withEnv("pulsar.bookkeeper-explicit-interval", "10")
-                .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+            prestoWorkerContainer = buildPrestoWorkerContainer(
+                    PrestoWorkerContainer.NAME, true, null, null);
         } else {
             prestoWorkerContainer = null;
         }
@@ -349,41 +344,78 @@ public class PulsarCluster {
     public void startPrestoWorker(String offloadDriver, String offloadProperties) {
         log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties);
         if (null == prestoWorkerContainer) {
-            prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
-                .withNetwork(network)
-                .withNetworkAliases(PrestoWorkerContainer.NAME)
-                .withEnv("clusterName", clusterName)
-                .withEnv("zkServers", ZKContainer.NAME)
-                .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
-                .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
-            if (spec.queryLastMessage) {
-                prestoWorkerContainer.withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
-                    .withEnv("pulsar.bookkeeper-explicit-interval", "10");
-            }
-            if (offloadDriver != null && offloadProperties != null) {
-                log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
-                        offloadDriver, offloadProperties);
-                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver);
-                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloader-properties", offloadProperties);
-                prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders");
-                // used in s3 tests
-                prestoWorkerContainer.withEnv("AWS_ACCESS_KEY_ID", "accesskey");
-                prestoWorkerContainer.withEnv("AWS_SECRET_KEY", "secretkey");
-            }
+            prestoWorkerContainer = buildPrestoWorkerContainer(
+                    PrestoWorkerContainer.NAME, true, offloadDriver, offloadProperties);
         }
-        log.info("[startPrestoWorker] Starting Presto Worker");
         prestoWorkerContainer.start();
+        log.info("[{}] Presto coordinator start finished.", prestoWorkerContainer.getContainerName());
     }
 
     public void stopPrestoWorker() {
+        if (sqlFollowWorkerContainers != null && sqlFollowWorkerContainers.size() > 0) {
+            for (PrestoWorkerContainer followWorker : sqlFollowWorkerContainers.values()) {
+                followWorker.stop();
+                log.info("Stopped presto follow worker {}.", followWorker.getContainerName());
+            }
+            sqlFollowWorkerContainers.clear();
+            log.info("Stopped all presto follow workers.");
+        }
         if (null != prestoWorkerContainer) {
             prestoWorkerContainer.stop();
-            log.info("Stopped Presto Worker");
+            log.info("Stopped presto coordinator.");
             prestoWorkerContainer = null;
         }
     }
 
+    public void startPrestoFollowWorkers(int numSqlFollowWorkers, String offloadDriver, String offloadProperties) {
+        log.info("start presto follow worker containers.");
+        sqlFollowWorkerContainers.putAll(runNumContainers(
+                "sql-follow-worker",
+                numSqlFollowWorkers,
+                (name) -> {
+                    log.info("build presto follow worker with name {}", name);
+                    return buildPrestoWorkerContainer(name, false, offloadDriver, offloadProperties);
+                }
+        ));
+        // Start workers that have been initialized
+        sqlFollowWorkerContainers.values().parallelStream().forEach(PrestoWorkerContainer::start);
+        log.info("Successfully started {} presto follow worker containers.", sqlFollowWorkerContainers.size());
+    }
+
+    private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolean isCoordinator,
+                                                             String offloadDriver, String offloadProperties) {
+        String resourcePath = isCoordinator ? "presto-coordinator-config.properties"
+                : "presto-follow-worker-config.properties";
+        PrestoWorkerContainer container = new PrestoWorkerContainer(
+                clusterName, hostName)
+                .withNetwork(network)
+                .withNetworkAliases(hostName)
+                .withEnv("clusterName", clusterName)
+                .withEnv("zkServers", ZKContainer.NAME)
+                .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080")
+                .withClasspathResourceMapping(
+                        resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
+        if (spec.queryLastMessage) {
+            container.withEnv("pulsar.bookkeeper-use-v2-protocol", "false")
+                    .withEnv("pulsar.bookkeeper-explicit-interval", "10");
+        }
+        if (offloadDriver != null && offloadProperties != null) {
+            log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}",
+                    offloadDriver, offloadProperties);
+            // used to query from tiered storage
+            container.withEnv("SQL_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver);
+            container.withEnv("SQL_PREFIX_pulsar.offloader-properties", offloadProperties);
+            container.withEnv("SQL_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders");
+            container.withEnv("AWS_ACCESS_KEY_ID", "accesskey");
+            container.withEnv("AWS_SECRET_KEY", "secretkey");
+        }
+        log.info("[{}] build presto worker container. isCoordinator: {}, resourcePath: {}",
+                container.getContainerName(), isCoordinator, resourcePath);
+        return container;
+    }
+
     public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) {
         switch (runtimeType) {
             case THREAD:
diff --git a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh b/tests/integration/src/test/resources/presto-coordinator-config.properties
old mode 100755
new mode 100644
similarity index 54%
copy from tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
copy to tests/integration/src/test/resources/presto-coordinator-config.properties
index 1de9c1a..03a9ad1
--- a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
+++ b/tests/integration/src/test/resources/presto-coordinator-config.properties
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,14 +17,28 @@
 # under the License.
 #
 
-bin/set_python_version.sh
+coordinator=true
 
-bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \
-    bin/apply-config-from-env.py conf/pulsar_env.sh
+node.id=ffffffff-ffff-ffff-ffff-ffffffffffff
+node.environment=test
+http-server.http.port=8081
 
-if [ -z "$NO_AUTOSTART" ]; then
-    sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/presto_worker.conf
-fi
+discovery-server.enabled=true
+discovery.uri=http://presto-worker:8081
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+exchange.http-client.max-connections=1000
+exchange.http-client.max-connections-per-server=1000
+exchange.http-client.connect-timeout=1m
+exchange.http-client.idle-timeout=1m
+
+scheduler.http-client.max-connections=1000
+scheduler.http-client.max-connections-per-server=1000
+scheduler.http-client.connect-timeout=1m
+scheduler.http-client.idle-timeout=1m
+
+query.client.timeout=5m
+query.min-expire-age=30m
+
+presto.version=testversion
+
+node-scheduler.include-coordinator=true
diff --git a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh b/tests/integration/src/test/resources/presto-follow-worker-config.properties
old mode 100755
new mode 100644
similarity index 65%
copy from tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
copy to tests/integration/src/test/resources/presto-follow-worker-config.properties
index 1de9c1a..be39b35
--- a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
+++ b/tests/integration/src/test/resources/presto-follow-worker-config.properties
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,14 +17,13 @@
 # under the License.
 #
 
-bin/set_python_version.sh
+coordinator=false
 
-bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \
-    bin/apply-config-from-env.py conf/pulsar_env.sh
+node.environment=test
+http-server.http.port=8081
+discovery.uri=http://presto-worker:8081
 
-if [ -z "$NO_AUTOSTART" ]; then
-    sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/presto_worker.conf
-fi
+query.client.timeout=5m
+query.min-expire-age=30m
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+presto.version=testversion
diff --git a/tests/integration/src/test/resources/pulsar-sql.xml b/tests/integration/src/test/resources/pulsar-sql.xml
index 46762f6..bedc443 100644
--- a/tests/integration/src/test/resources/pulsar-sql.xml
+++ b/tests/integration/src/test/resources/pulsar-sql.xml
@@ -23,6 +23,7 @@
     <test name="pulsar-sql-test-suite" preserve-order="true" >
         <classes>
             <class name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
+            <class name="org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage" />
         </classes>
     </test>
 </suite>