You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 12:35:07 UTC

[pulsar] branch master updated: [sql] Upgrade PrestoSQL to the first Trino version (#16683)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47b6611e340 [sql] Upgrade PrestoSQL to the first Trino version (#16683)
47b6611e340 is described below

commit 47b6611e340cb221086ca450592d44088d8eff6f
Author: tison <wa...@gmail.com>
AuthorDate: Mon Aug 8 20:34:52 2022 +0800

    [sql] Upgrade PrestoSQL to the first Trino version (#16683)
---
 bin/pulsar                                         |   2 +-
 build/run_unit_group.sh                            |   2 +-
 conf/presto/log.properties                         |   4 +-
 pom.xml                                            |   6 +-
 pulsar-sql/pom.xml                                 |   2 +-
 pulsar-sql/presto-distribution/LICENSE             | 160 ++++++++++-----------
 pulsar-sql/presto-distribution/pom.xml             |  16 +--
 .../src/main/resources/launcher.properties         |   2 +-
 pulsar-sql/presto-pulsar/pom.xml                   |  30 ++--
 .../org/apache/pulsar/sql/presto/PulsarAuth.java   |  16 +--
 .../pulsar/sql/presto/PulsarColumnHandle.java      |   6 +-
 .../pulsar/sql/presto/PulsarColumnMetadata.java    |   6 +-
 .../apache/pulsar/sql/presto/PulsarConnector.java  |  18 +--
 .../pulsar/sql/presto/PulsarConnectorFactory.java  |   8 +-
 .../pulsar/sql/presto/PulsarConnectorModule.java   |   8 +-
 .../presto/PulsarDispatchingRowDecoderFactory.java |   6 +-
 .../sql/presto/PulsarFieldValueProviders.java      |  16 +--
 .../pulsar/sql/presto/PulsarHandleResolver.java    |  12 +-
 .../pulsar/sql/presto/PulsarInternalColumn.java    |  10 +-
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  42 +++---
 .../org/apache/pulsar/sql/presto/PulsarPlugin.java |   4 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  16 +--
 .../apache/pulsar/sql/presto/PulsarRecordSet.java  |   6 +-
 .../pulsar/sql/presto/PulsarRecordSetProvider.java |  12 +-
 .../apache/pulsar/sql/presto/PulsarRowDecoder.java |   4 +-
 .../pulsar/sql/presto/PulsarRowDecoderFactory.java |   4 +-
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |   8 +-
 .../pulsar/sql/presto/PulsarSplitManager.java      |  61 ++++----
 .../pulsar/sql/presto/PulsarTableHandle.java       |   4 +-
 .../pulsar/sql/presto/PulsarTableLayoutHandle.java |   6 +-
 .../pulsar/sql/presto/PulsarTransactionHandle.java |   2 +-
 .../decoder/avro/PulsarAvroColumnDecoder.java      | 113 ++++++++-------
 .../presto/decoder/avro/PulsarAvroRowDecoder.java  |  12 +-
 .../decoder/avro/PulsarAvroRowDecoderFactory.java  |  75 +++++-----
 .../decoder/json/PulsarJsonFieldDecoder.java       | 150 ++++++++++---------
 .../presto/decoder/json/PulsarJsonRowDecoder.java  |   6 +-
 .../decoder/json/PulsarJsonRowDecoderFactory.java  |  60 ++++----
 .../primitive/PulsarPrimitiveRowDecoder.java       |  47 +++---
 .../PulsarPrimitiveRowDecoderFactory.java          |  34 ++---
 .../PulsarProtobufNativeColumnDecoder.java         |  66 ++++-----
 .../PulsarProtobufNativeRowDecoder.java            |  10 +-
 .../PulsarProtobufNativeRowDecoderFactory.java     |  46 +++---
 ...io.prestosql.spi.Plugin => io.trino.spi.Plugin} |   0
 .../pulsar/sql/presto/TestCacheSizeAllocator.java  |  12 +-
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   |  26 ++--
 .../pulsar/sql/presto/TestPulsarConnector.java     |  12 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  24 ++--
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  10 +-
 .../pulsar/sql/presto/TestPulsarSplitManager.java  |  28 ++--
 .../pulsar/sql/presto/TestReadChunkedMessages.java |  10 +-
 .../sql/presto/decoder/AbstractDecoderTester.java  |  14 +-
 .../pulsar/sql/presto/decoder/DecoderTestUtil.java |  22 +--
 .../presto/decoder/avro/AvroDecoderTestUtil.java   |   6 +-
 .../sql/presto/decoder/avro/TestAvroDecoder.java   |  47 +++---
 .../presto/decoder/json/JsonDecoderTestUtil.java   |   6 +-
 .../sql/presto/decoder/json/TestJsonDecoder.java   |  65 ++++++---
 .../primitive/PrimitiveDecoderTestUtil.java        |   4 +-
 .../decoder/primitive/TestPrimitiveDecoder.java    |  59 ++++----
 .../ProtobufNativeDecoderTestUtil.java             |  12 +-
 .../protobufnative/TestProtobufNativeDecoder.java  |  43 +++---
 tests/integration/pom.xml                          |   6 +-
 .../integration/presto/TestPulsarSQLBase.java      |   1 +
 .../integration/suites/PulsarSQLTestSuite.java     |   2 +-
 63 files changed, 781 insertions(+), 746 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index 205659f8d18..1fe2913f15c 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -391,7 +391,7 @@ elif [ $COMMAND == "compact-topic" ]; then
     exec $JAVA $OPTS org.apache.pulsar.compaction.CompactorTool --broker-conf $PULSAR_BROKER_CONF $@
 elif [ $COMMAND == "sql" ]; then
     check_presto_libraries
-    exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.prestosql.cli.Presto --server localhost:8081 "${@}"
+    exec $JAVA -cp "${PRESTO_HOME}/lib/*" io.trino.cli.Trino --server localhost:8081 "${@}"
 elif [ $COMMAND == "sql-worker" ]; then
     check_presto_libraries
     exec python3 ${PRESTO_HOME}/bin/launcher.py --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 65b2a5baec7..4804c236efc 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -163,7 +163,7 @@ function test_group_pulsar_io() {
     echo "::endgroup::"
 
     echo "::group::Running pulsar-sql tests"
-    mvn_test --install -Ppulsar-sql-tests,-main
+    mvn_test --install -Ppulsar-sql-tests,-main -DtestForkCount=1
     echo "::endgroup::"
 }
 
diff --git a/conf/presto/log.properties b/conf/presto/log.properties
index 000961f5d93..4a796b0e190 100644
--- a/conf/presto/log.properties
+++ b/conf/presto/log.properties
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-io.prestosql=INFO
+io.trino=INFO
 com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory=WARN
 com.ning.http.client=WARN
-io.prestosql.server.PluginManager=DEBUG
+io.trino.server.PluginManager=DEBUG
diff --git a/pom.xml b/pom.xml
index cbc447c0c3c..85d69232330 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,7 +158,7 @@ flexible messaging model and an intuitive client API.</description>
     <rabbitmq-client.version>5.5.3</rabbitmq-client.version>
     <aws-sdk.version>1.12.262</aws-sdk.version>
     <avro.version>1.10.2</avro.version>
-    <joda.version>2.10.5</joda.version>
+    <joda.version>2.10.10</joda.version>
     <jclouds.version>2.5.0</jclouds.version>
     <guice.version>5.1.0</guice.version>
     <sqlite-jdbc.version>3.36.0.3</sqlite-jdbc.version>
@@ -171,7 +171,7 @@ flexible messaging model and an intuitive client API.</description>
     <json-smart.version>2.4.7</json-smart.version>
     <opensearch.version>1.2.4</opensearch.version>
     <elasticsearch-java.version>8.1.0</elasticsearch-java.version>
-    <presto.version>334</presto.version>
+    <trino.version>363</trino.version>
     <scala.binary.version>2.13</scala.binary.version>
     <scala-library.version>2.13.6</scala-library.version>
     <debezium.version>1.7.2.Final</debezium.version>
@@ -1636,7 +1636,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>pulsar-client-cpp/generated/**</exclude>
             <!-- This is a text property file that contains just a class name -->
             <exclude>**/META-INF/services/com.scurrilous.circe.HashProvider</exclude>
-            <exclude>**/META-INF/services/io.prestosql.spi.Plugin</exclude>
+            <exclude>**/META-INF/services/io.trino.spi.Plugin</exclude>
 
             <!-- Django generated code -->
             <exclude>**/django/stats/migrations/*.py</exclude>
diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml
index 3e682619e56..b546b7c6b53 100644
--- a/pulsar-sql/pom.xml
+++ b/pulsar-sql/pom.xml
@@ -36,7 +36,7 @@
         <okhttp3.version>3.14.9</okhttp3.version>
         <!-- use okio version that matches the okhttp3 version -->
         <okio.version>1.17.2</okio.version>
-        <airlift.version>0.199</airlift.version>
+        <airlift.version>208</airlift.version>
     </properties>
 
     <dependencyManagement>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 5952ec6e48b..7bdd5eb290e 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -231,7 +231,6 @@ The Apache Software License, Version 2.0
     - commons-compress-1.21.jar
     - commons-lang3-3.11.jar
  * Netty
-    - netty-3.10.6.Final.jar
     - netty-buffer-4.1.77.Final.jar
     - netty-codec-4.1.77.Final.jar
     - netty-codec-dns-4.1.77.Final.jar
@@ -271,7 +270,7 @@ The Apache Software License, Version 2.0
     - jetcd-core-0.5.11.jar
 
  * Joda Time
-    - joda-time-2.10.5.jar
+    - joda-time-2.10.10.jar
     - failsafe-2.4.4.jar
   * Jetty
     - http2-client-9.4.48.v20220622.jar
@@ -290,37 +289,36 @@ The Apache Software License, Version 2.0
     - jetty-servlet-9.4.48.v20220622.jar
     - jetty-util-9.4.48.v20220622.jar
     - jetty-util-ajax-9.4.48.v20220622.jar
+  * Byte Buddy
+    - byte-buddy-1.11.13.jar
   * Apache BVal
-    - bval-jsr-2.0.0.jar
+    - bval-jsr-2.0.5.jar
   * Bytecode
     - bytecode-1.2.jar
-  * CGLIB Nodep
-    - cglib-nodep-3.3.0.jar
   * Airlift
     - aircompressor-0.20.jar
-    - airline-0.8.jar
-    - bootstrap-0.199.jar
-    - concurrent-0.199.jar
-    - configuration-0.199.jar
-    - discovery-0.199.jar
-    - discovery-server-1.29.jar
-    - event-0.199.jar
-    - http-client-0.199.jar
-    - http-server-0.199.jar
-    - jmx-0.199.jar
-    - jmx-http-0.199.jar
-    - jmx-http-rpc-0.199.jar
+    - bootstrap-208.jar
+    - concurrent-208.jar
+    - configuration-208.jar
+    - discovery-208.jar
+    - discovery-server-1.30.jar
+    - event-208.jar
+    - event-http-208.jar
+    - http-client-208.jar
+    - http-server-208.jar
+    - jmx-208.jar
+    - jmx-http-208.jar
+    - jmx-http-rpc-208.jar
     - joni-2.1.5.3.jar
-    - json-0.199.jar
-    - log-0.199.jar
-    - log-manager-0.199.jar
-    - node-0.199.jar
+    - json-208.jar
+    - log-208.jar
+    - log-manager-208.jar
+    - node-208.jar
     - parameternames-1.4.jar
-    - resolver-1.5.jar
-    - security-0.199.jar
-    - slice-0.38.jar
-    - stats-0.199.jar
-    - trace-token-0.199.jar
+    - security-208.jar
+    - slice-0.39.jar
+    - stats-208.jar
+    - trace-token-208.jar
     - units-1.6.jar
    * Apache HTTP Client
     - httpclient-4.5.13.jar
@@ -328,9 +326,9 @@ The Apache Software License, Version 2.0
   * Error Prone Annotations
     - error_prone_annotations-2.5.1.jar
   * Esri Geometry API For Java
-    - esri-geometry-api-2.2.2.jar
+    - esri-geometry-api-2.2.4.jar
   * Failsafe
-    - failsafe-2.0.1.jar
+    - failsafe-2.4.0.jar
   * Fastutil
     - fastutil-8.3.0.jar
   * J2ObjC Annotations
@@ -340,41 +338,27 @@ The Apache Software License, Version 2.0
   * Jmxutils
     - jmxutils-1.21.jar
   * LevelDB
-    - leveldb-0.10.jar
-    - leveldb-api-0.10.jar
+    - leveldb-0.12.jar
+    - leveldb-api-0.12.jar
+  * Log4j
+    - log4j-api-2.17.1.jar
+    - log4j-core-2.17.1.jar
+    - log4j-slf4j-impl-2.17.1.jar
   * Log4j implemented over SLF4J
     - log4j-over-slf4j-1.7.32.jar
   * Lucene Common Analyzers
     - lucene-analyzers-common-8.4.1.jar
     - lucene-core-8.4.1.jar
-  * Maven
-    - maven-aether-provider-3.0.5.jar
-    - maven-artifact-3.0.5.jar
-    - maven-core-3.0.5.jar
-    - maven-compat-3.0.5.jar
-    - maven-embedder-3.0.5.jar
-    - maven-model-3.0.5.jar
-    - maven-model-builder-3.0.5.jar
-    - maven-plugin-api-3.0.5.jar
-    - maven-repository-metadata-3.0.5.jar
-    - maven-settings-3.0.5.jar
-    - maven-settings-builder-3.0.5.jar
-	- wagon-provider-api-2.4.jar
+  * PicoCLI
+    - picocli-4.6.1.jar
+  * RxJava
+    - rxjava-3.0.1.jar
   * OkHttp
+    - logging-interceptor-3.14.9.jar
     - okhttp-3.14.9.jar
     - okhttp-urlconnection-3.14.9.jar
   * OpenCSV
     - opencsv-2.3.jar
-  * Plexus
-    - plexus-cipher-1.7.jar
-    - plexus-classworlds-2.4.jar
-    - plexus-component-annotations-1.5.5.jar
-    - plexus-container-default-1.5.5.jar
-    - plexus-interpolation-1.14.jar
-    - plexus-sec-dispatcher-1.3.jar
-    - plexus-utils-2.0.6.jar
-  * Apache XBean :: Reflect
-    - xbean-reflect-3.4.jar
   * Avro
     - avro-1.10.2.jar
     - avro-protobuf-1.10.2.jar
@@ -388,26 +372,27 @@ The Apache Software License, Version 2.0
   * JCommander
     - jcommander-1.82.jar
   * FindBugs JSR305
+    - jsr305-2.0.1.jar
     - jsr305-3.0.2.jar
   * Objenesis
     - objenesis-2.6.jar
   * Okio
     - okio-1.17.2.jar
-  * Presto
-    - presto-array-334.jar
-    - presto-cli-334.jar
-    - presto-client-334.jar
-    - presto-geospatial-toolkit-334.jar
-    - presto-main-334.jar
-    - presto-matching-334.jar
-    - presto-memory-context-334.jar
-    - presto-parser-334.jar
-    - presto-plugin-toolkit-334.jar
-    - presto-server-main-334.jar
-    - presto-spi-334.jar
-    - presto-record-decoder-334.jar
+  * Trino
+    - trino-array-363.jar
+    - trino-cli-363.jar
+    - trino-client-363.jar
+    - trino-geospatial-toolkit-363.jar
+    - trino-main-363.jar
+    - trino-matching-363.jar
+    - trino-memory-context-363.jar
+    - trino-parser-363.jar
+    - trino-plugin-toolkit-363.jar
+    - trino-server-main-363.jar
+    - trino-spi-363.jar
+    - trino-record-decoder-363.jar
   * RocksDB JNI
-    - rocksdbjni-6.10.2.jar
+    - rocksdbjni-6.29.4.1.jar
   * SnakeYAML
     - snakeyaml-1.30.jar
   * Bean Validation API
@@ -418,6 +403,7 @@ The Apache Software License, Version 2.0
     - metrics-core-4.1.12.1.jar
     - metrics-graphite-4.1.12.1.jar
     - metrics-jvm-4.1.12.1.jar
+    - metrics-jmx-4.1.12.1.jar
   * Prometheus
     - simpleclient-0.16.0.jar
     - simpleclient_common-0.16.0.jar
@@ -430,7 +416,6 @@ The Apache Software License, Version 2.0
   * JCTools
     - jctools-core-2.1.2.jar
   * Asynchronous Http Client
-    - async-http-client-1.6.5.jar
     - async-http-client-2.12.1.jar
     - async-http-client-netty-utils-2.12.1.jar
   * Apache Bookkeeper
@@ -449,13 +434,15 @@ The Apache Software License, Version 2.0
   * Apache Commons
     - commons-cli-1.5.0.jar
     - commons-codec-1.15.jar
-    - commons-collections4-4.1.jar
+    - commons-collections4-4.4.jar
     - commons-configuration-1.10.jar
     - commons-io-2.8.0.jar
     - commons-lang-2.6.jar
     - commons-logging-1.2.jar
   * GSON
     - gson-2.8.9.jar
+  * JSON Simple
+    - json-simple-1.1.1.jar
   * Snappy
     - snappy-java-1.1.8.4.jar
   * Jackson
@@ -480,6 +467,11 @@ The Apache Software License, Version 2.0
     - perfmark-api-0.19.0.jar
   * Annotations
     - auto-service-annotations-1.0.jar
+  * RabbitMQ Java Client
+    - amqp-client-5.5.3.jar
+  * Stream Lib
+    - stream-2.9.5.jar
+
 
 Protocol Buffers License
  * Protocol Buffers
@@ -493,16 +485,17 @@ BSD 3-clause "New" or "Revised" License
     - dsl-json-1.8.4.jar
 
 BSD License
- * ANTLR 4 Runtime -- antlr4-runtime-4.7.1.jar
+ * ANTLR 4 Runtime
+    - antlr4-runtime-4.9.2.jar
  * ASM, a very small and fast Java bytecode manipulation framework
     - asm-6.2.1.jar
     - asm-analysis-6.2.1.jar
     - asm-tree-6.2.1.jar
     - asm-util-6.2.1.jar
  * JLine
-   - jline-reader-3.12.1.jar
-   - jline-terminal-3.12.1.jar
-   - jline-terminal-jna-3.12.1.jar
+   - jline-reader-3.17.1.jar
+   - jline-terminal-3.17.1.jar
+   - jline-terminal-jna-3.17.1.jar
 
 MIT License
  * PCollections
@@ -517,6 +510,9 @@ MIT License
  * Annotations
    - animal-sniffer-annotations-1.19.jar
    - annotations-4.1.1.4.jar
+ * ScribeJava
+   - scribejava-apis-6.9.0.jar
+   - scribejava-core-6.9.0.jar
 
 CDDL - 1.0
  * OSGi Resource Locator
@@ -532,7 +528,7 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
    - hk2-utils-2.6.1.jar
    - aopalliance-repackaged-2.6.1.jar
  * Jersey
-    - jaxrs-0.199.jar
+    - jaxrs-208.jar
     - jersey-client-2.34.jar
     - jersey-container-servlet-2.34.jar
     - jersey-container-servlet-core-2.34.jar
@@ -544,18 +540,16 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
     - jersey-common-2.34.jar
  * JAXB
     - jaxb-api-2.3.1.jar
+    - jaxb-runtime-2.3.4.jar
+    - txw2-2.3.4.jar
+ Eclipse Distribution License 1.0 -- licenses/LICENSE-EDL-1.0.txt
+  * istack-commons-runtime-3.0.12.jar
+  * jts-io-common-1.16.1.jar
 
- Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt
-  * Aether
-     - aether-api-1.13.1.jar
-     - aether-connector-asynchttpclient-1.13.1.jar
-     - aether-connector-file-1.13.1.jar
-     - aether-impl-1.13.1.jar
-     - aether-spi-1.13.1.jar
-     - aether-util-1.13.1.jar
+ Eclipse Public License 1.0 -- licenses/LICENSE-EPL-1.0.txt
   * JTS Core
      - jts-core-1.16.1.jar
-  *JGraphT Core
+  * JGraphT Core
      - jgrapht-core-0.9.0.jar
   * Logback Core Module
     - logback-core-1.2.3.jar
diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml
index a92f0f37651..c77945ac5c3 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -36,10 +36,6 @@
     <jersey.version>2.34</jersey.version>
     <objenesis.version>2.6</objenesis.version>
     <objectsize.version>0.0.12</objectsize.version>
-    <jackson.version>2.13.2</jackson.version>
-    <!--fix Security Vulnerabilities-->
-    <!--https://www.cvedetails.com/vulnerability-list/vendor_id-15866/product_id-42991/Fasterxml-Jackson-databind.html-->
-    <jackson.databind.version>2.13.2.1</jackson.databind.version>
     <maven.version>3.0.5</maven.version>
     <guava.version>31.0.1-jre</guava.version>
     <asynchttpclient.version>2.12.1</asynchttpclient.version>
@@ -80,9 +76,9 @@
     </dependency>
 
     <dependency>
-      <groupId>io.prestosql</groupId>
-      <artifactId>presto-server-main</artifactId>
-      <version>${presto.version}</version>
+      <groupId>io.trino</groupId>
+      <artifactId>trino-server-main</artifactId>
+      <version>${trino.version}</version>
       <exclusions>
         <!-- exclude openjdk because of GPL license -->
         <exclusion>
@@ -105,9 +101,9 @@
     </dependency>
 
     <dependency>
-      <groupId>io.prestosql</groupId>
-      <artifactId>presto-cli</artifactId>
-      <version>${presto.version}</version>
+      <groupId>io.trino</groupId>
+      <artifactId>trino-cli</artifactId>
+      <version>${trino.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-sql/presto-distribution/src/main/resources/launcher.properties b/pulsar-sql/presto-distribution/src/main/resources/launcher.properties
index 0f997c45954..a8649925414 100644
--- a/pulsar-sql/presto-distribution/src/main/resources/launcher.properties
+++ b/pulsar-sql/presto-distribution/src/main/resources/launcher.properties
@@ -17,5 +17,5 @@
 # under the License.
 #
 
-main-class=io.prestosql.server.PrestoServer
+main-class=io.trino.server.TrinoServer
 process-name=pulsar-presto-distribution
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index 91548128624..99b94e2613e 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -79,16 +79,16 @@
         </dependency>
 
         <dependency>
-            <groupId>io.prestosql</groupId>
-            <artifactId>presto-plugin-toolkit</artifactId>
-            <version>${presto.version}</version>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-plugin-toolkit</artifactId>
+            <version>${trino.version}</version>
         </dependency>
 
         <!-- Presto SPI -->
         <dependency>
-            <groupId>io.prestosql</groupId>
-            <artifactId>presto-spi</artifactId>
-            <version>${presto.version}</version>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-spi</artifactId>
+            <version>${trino.version}</version>
             <scope>provided</scope>
         </dependency>
 
@@ -99,9 +99,9 @@
         </dependency>
 
         <dependency>
-            <groupId>io.prestosql</groupId>
-            <artifactId>presto-record-decoder</artifactId>
-            <version>${presto.version}</version>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-record-decoder</artifactId>
+            <version>${trino.version}</version>
         </dependency>
 
         <dependency>
@@ -118,16 +118,16 @@
         </dependency>
 
         <dependency>
-            <groupId>io.prestosql</groupId>
-            <artifactId>presto-main</artifactId>
-            <version>${presto.version}</version>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-main</artifactId>
+            <version>${trino.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>io.prestosql</groupId>
-            <artifactId>presto-testing</artifactId>
-            <version>${presto.version}</version>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-testing</artifactId>
+            <version>${trino.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
index b94e4a27611..b51ea48900b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
-import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import io.airlift.log.Logger;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ConnectorSession;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
@@ -82,7 +82,7 @@ public class PulsarAuth {
         }
         Map<String, String> extraCredentials = session.getIdentity().getExtraCredentials();
         if (extraCredentials.isEmpty()) { // the extraCredentials won't be null
-            throw new PrestoException(QUERY_REJECTED,
+            throw new TrinoException(QUERY_REJECTED,
                     String.format(
                             "Failed to check the authorization for topic %s: The credential information is empty.",
                             topic));
@@ -90,7 +90,7 @@ public class PulsarAuth {
         String authMethod = extraCredentials.get(CREDENTIALS_AUTH_PLUGIN);
         String authParams = extraCredentials.get(CREDENTIALS_AUTH_PARAMS);
         if (StringUtils.isEmpty(authMethod) || StringUtils.isEmpty(authParams)) {
-            throw new PrestoException(QUERY_REJECTED,
+            throw new TrinoException(QUERY_REJECTED,
                     String.format(
                             "Failed to check the authorization for topic %s: Required credential parameters are "
                                     + "missing. Please specify the auth-method and auth-params in the extra "
@@ -117,10 +117,10 @@ public class PulsarAuth {
                 log.debug("Check the authorization for the topic %s successfully.", topic);
             }
         } catch (PulsarClientException.AuthenticationException | PulsarClientException.AuthorizationException e) {
-            throw new PrestoException(PERMISSION_DENIED,
+            throw new TrinoException(PERMISSION_DENIED,
                     String.format("Failed to access topic %s: %s", topic, e.getLocalizedMessage()));
         } catch (IOException e) {
-            throw new PrestoException(QUERY_REJECTED,
+            throw new TrinoException(QUERY_REJECTED,
                     String.format("Failed to check authorization for topic %s: %s", topic, e.getLocalizedMessage()));
         }
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index fcb952bb3eb..0da9377b5ff 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -22,9 +22,9 @@ import static java.util.Objects.requireNonNull;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.Type;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.Type;
 import java.util.Objects;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 19ed33fe1ca..b50db5bdc8b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.Type;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.Type;
 import java.util.Objects;
 
 /**
@@ -128,7 +128,7 @@ public class PulsarColumnMetadata extends ColumnMetadata {
 
     /**
      * Decoder extra info for {@link org.apache.pulsar.sql.presto.PulsarColumnHandle}
-     * used by {@link io.prestosql.decoder.RowDecoder}.
+     * used by {@link io.trino.decoder.RowDecoder}.
      */
     public static class DecoderExtraInfo {
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 33852ba0ce6..12536f91a04 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -18,18 +18,18 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import static io.prestosql.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static io.prestosql.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
 import static java.util.Objects.requireNonNull;
 import io.airlift.bootstrap.LifeCycleManager;
 import io.airlift.log.Logger;
-import io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
-import io.prestosql.spi.connector.Connector;
-import io.prestosql.spi.connector.ConnectorMetadata;
-import io.prestosql.spi.connector.ConnectorRecordSetProvider;
-import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
-import io.prestosql.spi.transaction.IsolationLevel;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorRecordSetProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.transaction.IsolationLevel;
 import javax.inject.Inject;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
index f495cb4d43f..ffeac71ce78 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
@@ -24,10 +24,10 @@ import com.google.inject.Injector;
 import io.airlift.bootstrap.Bootstrap;
 import io.airlift.json.JsonModule;
 import io.airlift.log.Logger;
-import io.prestosql.spi.connector.Connector;
-import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.connector.ConnectorFactory;
-import io.prestosql.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.connector.ConnectorHandleResolver;
 import java.util.Map;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
index f5cf78e4153..1274b9175cf 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
@@ -26,10 +26,10 @@ import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Scopes;
-import io.prestosql.decoder.DecoderModule;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeId;
-import io.prestosql.spi.type.TypeManager;
+import io.trino.decoder.DecoderModule;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeId;
+import io.trino.spi.type.TypeManager;
 import javax.inject.Inject;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
index e70cdd2efe2..8434a3c4d8e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.sql.presto;
 import static java.lang.String.format;
 import com.google.inject.Inject;
 import io.airlift.log.Logger;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.TypeManager;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.TypeManager;
 import java.util.List;
 import java.util.Set;
 import org.apache.pulsar.common.naming.TopicName;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarFieldValueProviders.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarFieldValueProviders.java
index c1ef3c8cc6c..38f479c1e6d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarFieldValueProviders.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarFieldValueProviders.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import io.prestosql.decoder.FieldValueProvider;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.type.Timestamps;
 
 /**
  * custom FieldValueProvider for Pulsar.
@@ -40,17 +41,14 @@ public class PulsarFieldValueProviders {
     }
 
     /**
-     * FieldValueProvider for Time (Data,Timstamp etc.) with indicate Null instead of longValueProvider.
-     * @param value
-     * @param isNull
-     * @return
+     * FieldValueProvider for Time (Data, Timestamp etc.) with indicate Null instead of longValueProvider.
      */
-    public static FieldValueProvider timeValueProvider(long value, boolean isNull) {
+    public static FieldValueProvider timeValueProvider(long millis, boolean isNull) {
         return new FieldValueProvider() {
             @Override
-                public long getLong() {
-                    return value;
-                }
+            public long getLong() {
+                return millis * Timestamps.MICROSECONDS_PER_MILLISECOND;
+            }
 
             @Override
             public boolean isNull() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
index ad4dd8101fd..b7eddad2889 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java
@@ -20,12 +20,12 @@ package org.apache.pulsar.sql.presto;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorHandleResolver;
-import io.prestosql.spi.connector.ConnectorSplit;
-import io.prestosql.spi.connector.ConnectorTableHandle;
-import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorHandleResolver;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableLayoutHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
 
 /**
  * This class helps to resolve classes for the Presto connector.
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 6a81ee074bf..981dc3280e7 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -23,11 +23,11 @@ import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.requireNonNull;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarcharType;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 6a260cc9b14..f494b63e139 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
-import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import static io.trino.spi.StandardErrorCode.NOT_FOUND;
+import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
 import static java.util.Objects.requireNonNull;
 import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
 import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
@@ -32,20 +32,20 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.log.Logger;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.connector.ConnectorMetadata;
-import io.prestosql.spi.connector.ConnectorSession;
-import io.prestosql.spi.connector.ConnectorTableHandle;
-import io.prestosql.spi.connector.ConnectorTableLayout;
-import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
-import io.prestosql.spi.connector.ConnectorTableLayoutResult;
-import io.prestosql.spi.connector.ConnectorTableMetadata;
-import io.prestosql.spi.connector.Constraint;
-import io.prestosql.spi.connector.SchemaTableName;
-import io.prestosql.spi.connector.SchemaTablePrefix;
-import io.prestosql.spi.connector.TableNotFoundException;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableLayout;
+import io.trino.spi.connector.ConnectorTableLayoutHandle;
+import io.trino.spi.connector.ConnectorTableLayoutResult;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.TableNotFoundException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -119,7 +119,7 @@ public class PulsarMetadata implements ConnectorMetadata {
             }
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized");
+                throw new TrinoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized");
             }
             throw new RuntimeException("Failed to get schemas from pulsar: "
                     + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
@@ -186,7 +186,7 @@ public class PulsarMetadata implements ConnectorMetadata {
                         log.warn("Schema " + schemaNameOrNull + " does not exsit");
                         return builder.build();
                     } else if (e.getStatusCode() == 401) {
-                        throw new PrestoException(QUERY_REJECTED,
+                        throw new TrinoException(QUERY_REJECTED,
                                 String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull));
                     }
                     throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
@@ -302,7 +302,7 @@ public class PulsarMetadata implements ConnectorMetadata {
                 schemaInfo = PulsarSqlSchemaInfoProvider.defaultSchema();
 
             } else if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
+                throw new TrinoException(QUERY_REJECTED,
                         String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized",
                                 topicName));
             } else {
@@ -397,9 +397,9 @@ public class PulsarMetadata implements ConnectorMetadata {
                     .collect(Collectors.toSet());
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
-                throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
+                throw new TrinoException(NOT_FOUND, "Schema " + namespace + " does not exist");
             } else if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
+                throw new TrinoException(QUERY_REJECTED,
                         String.format("Failed to get topics in schema %s: Unauthorized", namespace));
             }
             throw new RuntimeException("Failed to get topics in schema " + namespace
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
index af76e72c923..63a90727275 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.sql.presto;
 
 import com.google.common.collect.ImmutableList;
-import io.prestosql.spi.Plugin;
-import io.prestosql.spi.connector.ConnectorFactory;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.ConnectorFactory;
 
 /**
  * Implementation of the Pulsar plugin for Pesto.
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 558f6b47e9d..b4391daf15d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.sql.presto;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.ImmutableSet.toImmutableSet;
-import static io.prestosql.decoder.FieldValueProviders.bytesValueProvider;
-import static io.prestosql.decoder.FieldValueProviders.longValueProvider;
+import static io.trino.decoder.FieldValueProviders.bytesValueProvider;
+import static io.trino.decoder.FieldValueProviders.longValueProvider;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -31,12 +31,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.RecordCursor;
-import io.prestosql.spi.type.Type;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.type.Type;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
index 0305c272c15..45fb8df3c30 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.sql.presto;
 
 import static java.util.Objects.requireNonNull;
 import com.google.common.collect.ImmutableList;
-import io.prestosql.spi.connector.RecordCursor;
-import io.prestosql.spi.connector.RecordSet;
-import io.prestosql.spi.type.Type;
+import io.trino.spi.connector.RecordCursor;
+import io.trino.spi.connector.RecordSet;
+import io.trino.spi.type.Type;
 import java.util.List;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
index cf6981000be..08058974193 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java
@@ -20,12 +20,12 @@ package org.apache.pulsar.sql.presto;
 
 import static java.util.Objects.requireNonNull;
 import com.google.common.collect.ImmutableList;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorRecordSetProvider;
-import io.prestosql.spi.connector.ConnectorSession;
-import io.prestosql.spi.connector.ConnectorSplit;
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
-import io.prestosql.spi.connector.RecordSet;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorRecordSetProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.RecordSet;
 import java.util.List;
 import javax.inject.Inject;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoder.java
index b736a3a73d4..a1de68d97b6 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoder.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.sql.presto;
 
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
 import java.util.Map;
 import java.util.Optional;
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoderFactory.java
index c2a2b73471e..0a799f447e8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRowDecoderFactory.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.connector.ColumnMetadata;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
 import java.util.List;
 import java.util.Set;
 import org.apache.pulsar.common.naming.TopicName;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 03a6b771bba..56d9d02da4f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import io.airlift.log.Logger;
-import io.prestosql.spi.HostAddress;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorSplit;
-import io.prestosql.spi.predicate.TupleDomain;
+import io.trino.spi.HostAddress;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.predicate.TupleDomain;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index ee644328ea3..42c29fd7ab6 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.sql.presto;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
 import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
 import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
@@ -27,19 +27,20 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import io.airlift.log.Logger;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorSession;
-import io.prestosql.spi.connector.ConnectorSplitManager;
-import io.prestosql.spi.connector.ConnectorSplitSource;
-import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
-import io.prestosql.spi.connector.FixedSplitSource;
-import io.prestosql.spi.predicate.Domain;
-import io.prestosql.spi.predicate.Range;
-import io.prestosql.spi.predicate.TupleDomain;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTableLayoutHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.FixedSplitSource;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.Range;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.predicate.Utils;
 import java.io.IOException;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -76,7 +77,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
 
     private static final Logger log = Logger.get(PulsarSplitManager.class);
 
-    private ObjectMapper objectMapper = new ObjectMapper();
+    private final ObjectMapper objectMapper = new ObjectMapper();
 
     @Inject
     public PulsarSplitManager(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) {
@@ -111,7 +112,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                     String.format("%s/%s", namespace, tableHandle.getTopicName()));
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
+                throw new TrinoException(QUERY_REJECTED,
                         String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
                                 namespace, tableHandle.getTopicName()));
             } else if (e.getStatusCode() == 404) {
@@ -193,7 +194,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
             numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
+                throw new TrinoException(QUERY_REJECTED,
                     String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
             }
 
@@ -207,13 +208,15 @@ public class PulsarSplitManager implements ConnectorSplitManager {
             if (domain != null) {
                 domain.getValues().getValuesProcessor().consume(
                     ranges -> domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
-                        Integer low = 0;
-                        Integer high = numPartitions;
-                        if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
-                            low = range.getLow().getValueBlock().get().getInt(0, 0);
+                        int low = 0;
+                        int high = numPartitions;
+                        if (range.getLowValue().isPresent()) {
+                            Block block = Utils.nativeValueToBlock(range.getType(), range.getLowBoundedValue());
+                            low = block.getInt(0, 0);
                         }
-                        if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
-                            high = range.getHigh().getValueBlock().get().getInt(0, 0);
+                        if (range.getHighValue().isPresent()) {
+                            Block block = Utils.nativeValueToBlock(range.getType(), range.getHighBoundedValue());
+                            high = block.getInt(0, 0);
                         }
                         for (int i = low; i <= high; i++) {
                             predicatePartitions.add(i);
@@ -274,7 +277,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
 
             long numEntries = readOnlyCursor.getNumberOfEntries();
             if (numEntries <= 0) {
-                return Collections.EMPTY_LIST;
+                return Collections.emptyList();
             }
 
             PredicatePushdownInfo predicatePushdownInfo = PredicatePushdownInfo.getPredicatePushdownInfo(
@@ -379,14 +382,14 @@ public class PulsarSplitManager implements ConnectorSplitManager {
 
                             Range range = domain.getValues().getRanges().getOrderedRanges().get(0);
 
-                            if (!range.getHigh().isUpperUnbounded()) {
-                                upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
-                                        .getLong(0, 0)).getTime();
+                            if (!range.isHighUnbounded()) {
+                                Block block = Utils.nativeValueToBlock(range.getType(), range.getHighBoundedValue());
+                                upperBoundTs = block.getLong(0, 0) / 1000;
                             }
 
-                            if (!range.getLow().isLowerUnbounded()) {
-                                lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
-                                        .getLong(0, 0)).getTime();
+                            if (!range.isLowUnbounded()) {
+                                Block block = Utils.nativeValueToBlock(range.getType(), range.getLowBoundedValue());
+                                lowerBoundTs = block.getLong(0, 0) / 1000;
                             }
 
                             PositionImpl overallStartPos;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
index 271d8884dde..a7938c57d43 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java
@@ -22,8 +22,8 @@ import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import io.prestosql.spi.connector.ConnectorTableHandle;
-import io.prestosql.spi.connector.SchemaTableName;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.SchemaTableName;
 import java.util.Objects;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
index 4ee0ea8aca0..95c03c0ee82 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.sql.presto;
 import static java.util.Objects.requireNonNull;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
-import io.prestosql.spi.predicate.TupleDomain;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorTableLayoutHandle;
+import io.trino.spi.predicate.TupleDomain;
 import java.util.Objects;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
index a80b98582a0..34b6f759c9a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.ConnectorTransactionHandle;
 
 /**
  * A handle for transactions.
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
index 0c57336d213..561946dc4a1 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroColumnDecoder.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.sql.presto.decoder.avro;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.slice.Slices.utf8Slice;
-import static io.prestosql.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
-import static io.prestosql.spi.StandardErrorCode.GENERIC_USER_ERROR;
-import static io.prestosql.spi.type.Varchars.truncateToLength;
+import static io.trino.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.type.Varchars.truncateToLength;
 import static java.lang.Float.floatToIntBits;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
@@ -31,30 +31,31 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.block.BlockBuilder;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DateType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.Decimals;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.RowType.Field;
-import io.prestosql.spi.type.SmallintType;
-import io.prestosql.spi.type.TimeType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TinyintType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.BlockBuilder;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Decimals;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.RowType.Field;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -65,11 +66,11 @@ import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 
 /**
- * Copy from {@link io.prestosql.decoder.avro.AvroColumnDecoder} (presto-record-decoder-345)
+ * Copy from {@link io.trino.decoder.avro.AvroColumnDecoder} (presto-record-decoder-345)
  * with A little bit pulsar's extensions.
- * 1) support {@link io.prestosql.spi.type.TimestampType},{@link io.prestosql.spi.type.DateType}DATE,
- *  * {@link io.prestosql.spi.type.TimeType}.
- * 2) support {@link io.prestosql.spi.type.RealType}.
+ * 1) support {@link io.trino.spi.type.TimestampType},{@link io.trino.spi.type.DateType}DATE,
+ *  * {@link io.trino.spi.type.TimeType}.
+ * 2) support {@link io.trino.spi.type.RealType}.
  */
 public class PulsarAvroColumnDecoder {
     private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
@@ -80,9 +81,9 @@ public class PulsarAvroColumnDecoder {
             BigintType.BIGINT,
             RealType.REAL,
             DoubleType.DOUBLE,
-            TimestampType.TIMESTAMP,
+            TimestampType.TIMESTAMP_MILLIS,
             DateType.DATE,
-            TimeType.TIME,
+            TimeType.TIME_MILLIS,
             VarbinaryType.VARBINARY);
 
     private final Type columnType;
@@ -106,7 +107,7 @@ public class PulsarAvroColumnDecoder {
             checkArgument(isSupportedType(columnType),
                     "Unsupported column type '%s' for column '%s'", columnType, columnName);
         } catch (IllegalArgumentException e) {
-            throw new PrestoException(GENERIC_USER_ERROR, e);
+            throw new TrinoException(GENERIC_USER_ERROR, e);
         }
     }
 
@@ -183,7 +184,7 @@ public class PulsarAvroColumnDecoder {
             if (value instanceof Double || value instanceof Float) {
                 return ((Number) value).doubleValue();
             }
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -193,7 +194,7 @@ public class PulsarAvroColumnDecoder {
             if (value instanceof Boolean) {
                 return (Boolean) value;
             }
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -201,7 +202,14 @@ public class PulsarAvroColumnDecoder {
         @Override
         public long getLong() {
             if (value instanceof Long || value instanceof Integer) {
-                return ((Number) value).longValue();
+                final long payload = ((Number) value).longValue();
+                if (TimestampType.TIMESTAMP_MILLIS.equals(columnType)) {
+                    return payload * Timestamps.MICROSECONDS_PER_MILLISECOND;
+                }
+                if (TimeType.TIME_MILLIS.equals(columnType)) {
+                    return payload * Timestamps.PICOSECONDS_PER_MILLISECOND;
+                }
+                return payload;
             }
 
             if (columnType instanceof RealType) {
@@ -215,7 +223,7 @@ public class PulsarAvroColumnDecoder {
                 return new BigInteger(bytes).longValue();
             }
 
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -251,7 +259,7 @@ public class PulsarAvroColumnDecoder {
             return Decimals.encodeUnscaledValue(bigInteger);
         }
 
-        throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+        throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
     }
@@ -304,11 +312,21 @@ public class PulsarAvroColumnDecoder {
             return;
         }
 
-        if ((value instanceof Integer || value instanceof Long)
-                && (type instanceof BigintType || type instanceof IntegerType
-                || type instanceof SmallintType || type instanceof TinyintType)) {
-            type.writeLong(blockBuilder, ((Number) value).longValue());
-            return;
+        if (value instanceof Integer || value instanceof Long) {
+            final long payload = ((Number) value).longValue();
+            if (type instanceof BigintType || type instanceof IntegerType
+                    || type instanceof SmallintType || type instanceof TinyintType) {
+                type.writeLong(blockBuilder, payload);
+                return;
+            }
+            if (TimestampType.TIMESTAMP_MILLIS.equals(type)) {
+                type.writeLong(blockBuilder, payload * Timestamps.MICROSECONDS_PER_MILLISECOND);
+                return;
+            }
+            if (TimeType.TIME_MILLIS.equals(type)) {
+                type.writeLong(blockBuilder, payload * Timestamps.PICOSECONDS_PER_MILLISECOND);
+                return;
+            }
         }
 
         if (type instanceof DoubleType) {
@@ -326,12 +344,7 @@ public class PulsarAvroColumnDecoder {
             return;
         }
 
-        if (type instanceof TimestampType) {
-            type.writeLong(blockBuilder, (Long) value);
-            return;
-        }
-
-        throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+        throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoder.java
index 86b63eb6de8..d52402a6e85 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoder.java
@@ -20,12 +20,12 @@ package org.apache.pulsar.sql.presto.decoder.avro;
 
 import static com.google.common.base.Functions.identity;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 import static java.util.Objects.requireNonNull;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.sql.presto.PulsarRowDecoder;
 
 /**
- * Refer to {@link io.prestosql.decoder.avro.AvroRowDecoder}.
+ * Refer to {@link io.trino.decoder.avro.AvroRowDecoderFactory}.
  */
 public class PulsarAvroRowDecoder implements PulsarRowDecoder {
 
@@ -65,7 +65,7 @@ public class PulsarAvroRowDecoder implements PulsarRowDecoder {
             avroRecord = record.getAvroRecord();
         } catch (Exception e) {
             e.printStackTrace();
-            throw new PrestoException(GENERIC_INTERNAL_ERROR, "Decoding avro record failed.", e);
+            throw new TrinoException(GENERIC_INTERNAL_ERROR, "Decoding avro record failed.", e);
         }
         return Optional.of(columnDecoders.entrySet().stream()
                 .collect(toImmutableMap(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
index 74b0a88fcef..f8c5b8b5351 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
@@ -19,33 +19,33 @@
 package org.apache.pulsar.sql.presto.decoder.avro;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.prestosql.spi.type.DateType.DATE;
-import static io.prestosql.spi.type.TimeType.TIME;
-import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
 import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import com.google.common.collect.ImmutableList;
 import io.airlift.log.Logger;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeManager;
-import io.prestosql.spi.type.TypeSignature;
-import io.prestosql.spi.type.TypeSignatureParameter;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import io.trino.spi.type.TypeSignature;
+import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -68,7 +68,7 @@ import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
  */
 public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
 
-    private TypeManager typeManager;
+    private final TypeManager typeManager;
 
     public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
         this.typeManager = typeManager;
@@ -86,14 +86,14 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
         List<ColumnMetadata> columnMetadata;
         String schemaJson = new String(schemaInfo.getSchema());
         if (StringUtils.isBlank(schemaJson)) {
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
         Schema schema;
         try {
             schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
         } catch (SchemaParseException ex) {
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
 
@@ -110,13 +110,13 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
         } catch (StackOverflowError e){
             log.warn(e, "Topic "
                     + topicName.toString() + " extractColumnMetadata failed.");
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " schema may contains cyclic definitions.", e);
         }
         return columnMetadata;
     }
 
-    private Type parseAvroPrestoType(String fieldname, Schema schema) {
+    private Type parseAvroPrestoType(String fieldName, Schema schema) {
         Schema.Type type = schema.getType();
         LogicalType logicalType  = schema.getLogicalType();
         switch (type) {
@@ -126,7 +126,7 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
             case NULL:
                 throw new UnsupportedOperationException(
                         format("field '%s' NULL type code should not be reached,"
-                                + "please check the schema or report the bug.", fieldname));
+                                + "please check the schema or report the bug.", fieldName));
             case FIXED:
             case BYTES:
                 //  When the precision <= 0, throw Exception.
@@ -140,16 +140,15 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
-                    return TIME;
+                    return TimeType.TIME_MILLIS;
                 } else if (logicalType == LogicalTypes.date()) {
-                    return DATE;
+                    return DateType.DATE;
                 }
                 return IntegerType.INTEGER;
             case LONG:
                 if (logicalType == LogicalTypes.timestampMillis()) {
-                    return TimestampType.TIMESTAMP;
+                    return TimestampType.TIMESTAMP_MILLIS;
                 }
-                //TODO: support timestamp_microseconds logicalType : https://github.com/prestosql/presto/issues/1284
                 return BigintType.BIGINT;
             case FLOAT:
                 return RealType.REAL;
@@ -158,10 +157,10 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
             case BOOLEAN:
                 return BooleanType.BOOLEAN;
             case ARRAY:
-                return new ArrayType(parseAvroPrestoType(fieldname, schema.getElementType()));
+                return new ArrayType(parseAvroPrestoType(fieldName, schema.getElementType()));
             case MAP:
                 //The key for an avro map must be string
-                TypeSignature valueType = parseAvroPrestoType(fieldname, schema.getValueType()).getTypeSignature();
+                TypeSignature valueType = parseAvroPrestoType(fieldName, schema.getValueType()).getTypeSignature();
                 return typeManager.getParameterizedType(StandardTypes.MAP,
                         ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
                                 TypeSignatureParameter.typeParameter(valueType)));
@@ -174,16 +173,16 @@ public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
                 } else {
                     throw new UnsupportedOperationException(format(
                             "field '%s' of record type has no fields, "
-                                    + "please check schema definition. ", fieldname));
+                                    + "please check schema definition. ", fieldName));
                 }
             case UNION:
                 for (Schema nestType : schema.getTypes()) {
                     if (nestType.getType() != Schema.Type.NULL) {
-                        return parseAvroPrestoType(fieldname, nestType);
+                        return parseAvroPrestoType(fieldName, nestType);
                     }
                 }
                 throw new UnsupportedOperationException(format(
-                        "field '%s' of UNION type must contains not NULL type.", fieldname));
+                        "field '%s' of UNION type must contains not NULL type.", fieldName));
             default:
                 throw new UnsupportedOperationException(format(
                         "Can't convert from schema type '%s' (%s) to presto type.",
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index fcd2550f9ba..80404295615 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -21,23 +21,10 @@ package org.apache.pulsar.sql.presto.decoder.json;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.slice.Slices.utf8Slice;
-import static io.prestosql.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
-import static io.prestosql.spi.type.BigintType.BIGINT;
-import static io.prestosql.spi.type.BooleanType.BOOLEAN;
-import static io.prestosql.spi.type.DateType.DATE;
-import static io.prestosql.spi.type.DoubleType.DOUBLE;
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.RealType.REAL;
-import static io.prestosql.spi.type.SmallintType.SMALLINT;
-import static io.prestosql.spi.type.TimeType.TIME;
-import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
-import static io.prestosql.spi.type.TinyintType.TINYINT;
-import static io.prestosql.spi.type.Varchars.isVarcharType;
-import static io.prestosql.spi.type.Varchars.truncateToLength;
+import static io.trino.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
+import static io.trino.spi.type.Varchars.truncateToLength;
 import static java.lang.Double.parseDouble;
 import static java.lang.Float.floatToIntBits;
-import static java.lang.Float.parseFloat;
-import static java.lang.Long.parseLong;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -46,31 +33,32 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.ImmutableList;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.decoder.json.JsonFieldDecoder;
-import io.prestosql.decoder.json.JsonRowDecoderFactory;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.block.BlockBuilder;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DateType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.Decimals;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.SmallintType;
-import io.prestosql.spi.type.TimeType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TinyintType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.decoder.json.JsonFieldDecoder;
+import io.trino.decoder.json.JsonRowDecoderFactory;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.BlockBuilder;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Decimals;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
@@ -78,14 +66,14 @@ import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
- * Copy from {@link io.prestosql.decoder.json.DefaultJsonFieldDecoder} (presto-record-decoder-345)
+ * Copy from {@link io.trino.decoder.json.DefaultJsonFieldDecoder} (presto-record-decoder-345)
  * with some pulsar's extensions.
- * 1) support {@link io.prestosql.spi.type.ArrayType}.
- * 2) support {@link io.prestosql.spi.type.MapType}.
- * 3) support {@link io.prestosql.spi.type.RowType}.
- * 4) support {@link io.prestosql.spi.type.TimestampType},{@link io.prestosql.spi.type.DateType},
- * {@link io.prestosql.spi.type.TimeType}.
- * 5) support {@link io.prestosql.spi.type.RealType}.
+ * 1) support {@link io.trino.spi.type.ArrayType}.
+ * 2) support {@link io.trino.spi.type.MapType}.
+ * 3) support {@link io.trino.spi.type.RowType}.
+ * 4) support {@link io.trino.spi.type.TimestampType},{@link io.trino.spi.type.DateType},
+ * {@link io.trino.spi.type.TimeType}.
+ * 5) support {@link io.trino.spi.type.RealType}.
  */
 public class PulsarJsonFieldDecoder
         implements JsonFieldDecoder {
@@ -106,14 +94,14 @@ public class PulsarJsonFieldDecoder
     }
 
     private static Pair<Long, Long> getNumRangeByType(Type type) {
-        if (type == TINYINT) {
+        if (type == TinyintType.TINYINT) {
             return Pair.of((long) Byte.MIN_VALUE, (long) Byte.MAX_VALUE);
-        } else if (type == SMALLINT) {
+        } else if (type == SmallintType.SMALLINT) {
             return Pair.of((long) Short.MIN_VALUE, (long) Short.MAX_VALUE);
-        } else if (type == INTEGER) {
+        } else if (type == IntegerType.INTEGER) {
             return Pair.of((long) Integer.MIN_VALUE, (long) Integer.MAX_VALUE);
-        } else if (type == BIGINT) {
-            return Pair.of((long) Long.MIN_VALUE, (long) Long.MAX_VALUE);
+        } else if (type == BigintType.BIGINT) {
+            return Pair.of(Long.MIN_VALUE, Long.MAX_VALUE);
         } else {
             // those values will not be used if column type is not one of mentioned above
             return Pair.of(Long.MIN_VALUE, Long.MAX_VALUE);
@@ -124,20 +112,20 @@ public class PulsarJsonFieldDecoder
         if (type instanceof DecimalType) {
             return true;
         }
-        if (isVarcharType(type)) {
+        if (type instanceof VarcharType) {
             return true;
         }
         if (ImmutableList.of(
-                BIGINT,
-                INTEGER,
-                SMALLINT,
-                TINYINT,
-                BOOLEAN,
-                DOUBLE,
-                TIMESTAMP,
-                DATE,
-                TIME,
-                REAL
+                BigintType.BIGINT,
+                IntegerType.INTEGER,
+                SmallintType.SMALLINT,
+                TinyintType.TINYINT,
+                BooleanType.BOOLEAN,
+                DoubleType.DOUBLE,
+                TimestampType.TIMESTAMP_MILLIS,
+                DateType.DATE,
+                TimeType.TIME_MILLIS,
+                RealType.REAL
         ).contains(type)) {
             return true;
         }
@@ -221,7 +209,7 @@ public class PulsarJsonFieldDecoder
             if (value.isValueNode()) {
                 return value.asBoolean();
             }
-            throw new PrestoException(
+            throw new TrinoException(
                     DECODER_CONVERSION_NOT_SUPPORTED,
                     format("could not parse non-value node as '%s' for column '%s'", type, columnName));
         }
@@ -229,32 +217,38 @@ public class PulsarJsonFieldDecoder
         public static long getLong(JsonNode value, Type type, String columnName, long minValue, long maxValue) {
             try {
                 if (type instanceof RealType) {
-                    return floatToIntBits((Float) parseFloat(value.asText()));
+                    return floatToIntBits(Float.parseFloat(value.asText()));
                 }
 
                 // If it is decimalType, need to eliminate the decimal point,
                 // and give it to presto to set the decimal point
                 if (type instanceof DecimalType) {
                     String decimalLong = value.asText().replace(".", "");
-                    return Long.valueOf(decimalLong);
+                    return Long.parseLong(decimalLong);
                 }
 
-                long longValue;
+                Long longValue;
                 if (value.isIntegralNumber() && !value.isBigInteger()) {
                     longValue = value.longValue();
-                    if (longValue >= minValue && longValue <= maxValue) {
-                        return longValue;
-                    }
                 } else if (value.isValueNode()) {
-                    longValue = parseLong(value.asText());
-                    if (longValue >= minValue && longValue <= maxValue) {
-                        return longValue;
+                    longValue = Long.parseLong(value.asText());
+                } else {
+                    longValue = null;
+                }
+
+                if (longValue != null && longValue >= minValue && longValue <= maxValue) {
+                    if (TimestampType.TIMESTAMP_MILLIS.equals(type)) {
+                        return longValue * Timestamps.MICROSECONDS_PER_MILLISECOND;
+                    }
+                    if (TimeType.TIME_MILLIS.equals(type)) {
+                        return longValue * Timestamps.PICOSECONDS_PER_MILLISECOND;
                     }
+                    return longValue;
                 }
             } catch (NumberFormatException ignore) {
                 // ignore
             }
-            throw new PrestoException(
+            throw new TrinoException(
                     DECODER_CONVERSION_NOT_SUPPORTED,
                     format("could not parse value '%s' as '%s' for column '%s'", value.asText(), type, columnName));
         }
@@ -270,7 +264,7 @@ public class PulsarJsonFieldDecoder
             } catch (NumberFormatException ignore) {
                 // ignore
             }
-            throw new PrestoException(
+            throw new TrinoException(
                     DECODER_CONVERSION_NOT_SUPPORTED,
                     format("could not parse value '%s' as '%s' for column '%s'", value.asText(), type, columnName));
 
@@ -288,7 +282,7 @@ public class PulsarJsonFieldDecoder
             }
 
             Slice slice = utf8Slice(textValue);
-            if (isVarcharType(type)) {
+            if (type instanceof VarcharType) {
                 slice = truncateToLength(slice, type);
             }
             return slice;
@@ -348,7 +342,7 @@ public class PulsarJsonFieldDecoder
             if (node instanceof JsonNode) {
                 value = (JsonNode) node;
             } else {
-                throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+                throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                         format("primitive object of '%s' as '%s' for column '%s' cann't convert to JsonNode",
                                 node.getClass(), type, columnName));
             }
@@ -377,7 +371,7 @@ public class PulsarJsonFieldDecoder
                 return;
             }
 
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'", value.getClass(), type, columnName));
         }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoder.java
index 045b25110de..ba0e4053612 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoder.java
@@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.MissingNode;
 import com.google.common.base.Splitter;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.decoder.json.JsonFieldDecoder;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.decoder.json.JsonFieldDecoder;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index fef4b39d59e..5a832eb85d8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -19,33 +19,33 @@
 package org.apache.pulsar.sql.presto.decoder.json;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.prestosql.spi.type.DateType.DATE;
-import static io.prestosql.spi.type.TimeType.TIME;
-import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
 import static java.lang.String.format;
 import static java.util.stream.Collectors.toList;
 import com.google.common.collect.ImmutableList;
 import io.airlift.log.Logger;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeManager;
-import io.prestosql.spi.type.TypeSignature;
-import io.prestosql.spi.type.TypeSignatureParameter;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import io.trino.spi.type.TypeSignature;
+import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -66,7 +66,7 @@ import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
  */
 public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
 
-    private TypeManager typeManager;
+    private final TypeManager typeManager;
 
     public PulsarJsonRowDecoderFactory(TypeManager typeManager) {
         this.typeManager = typeManager;
@@ -84,7 +84,7 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
         List<ColumnMetadata> columnMetadata;
         String schemaJson = new String(schemaInfo.getSchema());
         if (StringUtils.isBlank(schemaJson)) {
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
 
@@ -92,7 +92,7 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
         try {
             schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
         } catch (SchemaParseException ex) {
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
 
@@ -109,7 +109,7 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
         } catch (StackOverflowError e) {
             log.warn(e, "Topic "
                     + topicName.toString() + " extractColumnMetadata failed.");
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " schema may contains cyclic definitions.", e);
         }
         return columnMetadata;
@@ -140,14 +140,14 @@ public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
                 return VarbinaryType.VARBINARY;
             case INT:
                 if (logicalType == LogicalTypes.timeMillis()) {
-                    return TIME;
+                    return TimeType.TIME_MILLIS;
                 } else if (logicalType == LogicalTypes.date()) {
-                    return DATE;
+                    return DateType.DATE;
                 }
                 return IntegerType.INTEGER;
             case LONG:
                 if (logicalType == LogicalTypes.timestampMillis()) {
-                    return TimestampType.TIMESTAMP;
+                    return TimestampType.TIMESTAMP_MILLIS;
                 }
                 return BigintType.BIGINT;
             case FLOAT:
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
index 2a3f51dae29..71ed9c71e5c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
@@ -18,27 +18,28 @@
  */
 package org.apache.pulsar.sql.presto.decoder.primitive;
 
-import static io.prestosql.decoder.FieldValueProviders.booleanValueProvider;
-import static io.prestosql.decoder.FieldValueProviders.bytesValueProvider;
-import static io.prestosql.decoder.FieldValueProviders.longValueProvider;
+import static io.trino.decoder.FieldValueProviders.booleanValueProvider;
+import static io.trino.decoder.FieldValueProviders.bytesValueProvider;
+import static io.trino.decoder.FieldValueProviders.longValueProvider;
 import static org.apache.pulsar.sql.presto.PulsarFieldValueProviders.doubleValueProvider;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.decoder.FieldValueProviders;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DateType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.SmallintType;
-import io.prestosql.spi.type.TimeType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TinyintType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.decoder.FieldValueProviders;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
@@ -89,9 +90,13 @@ public class PulsarPrimitiveRowDecoder implements PulsarRowDecoder {
             } else if (type instanceof DateType) {
                 primitiveColumn.put(columnHandle, longValueProvider(((Date) value).getTime()));
             } else if (type instanceof TimeType) {
-                primitiveColumn.put(columnHandle, longValueProvider(((Time) value).getTime()));
+                final long millis = ((Time) value).getTime();
+                final long picos = millis * Timestamps.PICOSECONDS_PER_MILLISECOND;
+                primitiveColumn.put(columnHandle, longValueProvider(picos));
             } else if (type instanceof TimestampType) {
-                primitiveColumn.put(columnHandle, longValueProvider(((Timestamp) value).getTime()));
+                final long millis = ((Timestamp) value).getTime();
+                final long micros = millis * Timestamps.MICROSECONDS_PER_MILLISECOND;
+                primitiveColumn.put(columnHandle, longValueProvider(micros));
             } else {
                 primitiveColumn.put(columnHandle, bytesValueProvider(value.toString().getBytes()));
             }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
index 986a2d57e13..e551b912279 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
@@ -19,21 +19,21 @@
 package org.apache.pulsar.sql.presto.decoder.primitive;
 
 import io.airlift.log.Logger;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DateType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.SmallintType;
-import io.prestosql.spi.type.TimeType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TinyintType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DateType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimeType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -104,9 +104,9 @@ public class PulsarPrimitiveRowDecoderFactory implements PulsarRowDecoderFactory
             case DATE:
                 return DateType.DATE;
             case TIME:
-                return TimeType.TIME;
+                return TimeType.TIME_MILLIS;
             case TIMESTAMP:
-                return TimestampType.TIMESTAMP;
+                return TimestampType.TIMESTAMP_MILLIS;
             default:
                 log.error("Can't convert type: %s for %s", pulsarType, fieldName);
                 return null;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeColumnDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeColumnDecoder.java
index 71ed707c7b1..755341644e9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeColumnDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeColumnDecoder.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.sql.presto.decoder.protobufnative;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.slice.Slices.utf8Slice;
-import static io.prestosql.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
-import static io.prestosql.spi.StandardErrorCode.GENERIC_USER_ERROR;
-import static io.prestosql.spi.type.Varchars.truncateToLength;
+import static io.trino.decoder.DecoderErrorCode.DECODER_CONVERSION_NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.type.Varchars.truncateToLength;
 import static java.lang.Float.floatToIntBits;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
@@ -34,26 +34,27 @@ import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.EnumValue;
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.block.BlockBuilder;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.RowType.Field;
-import io.prestosql.spi.type.SmallintType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TinyintType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
+import io.trino.spi.block.BlockBuilder;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.RowType.Field;
+import io.trino.spi.type.SmallintType;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.TinyintType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,7 +71,7 @@ public class PulsarProtobufNativeColumnDecoder {
             RealType.REAL,
             DoubleType.DOUBLE,
             VarbinaryType.VARBINARY,
-            TimestampType.TIMESTAMP);
+            TimestampType.TIMESTAMP_MILLIS);
 
     private final Type columnType;
     private final String columnMapping;
@@ -93,7 +94,7 @@ public class PulsarProtobufNativeColumnDecoder {
             checkArgument(isSupportedType(columnType),
                     "Unsupported column type '%s' for column '%s'", columnType, columnName);
         } catch (IllegalArgumentException e) {
-            throw new PrestoException(GENERIC_USER_ERROR, e);
+            throw new TrinoException(GENERIC_USER_ERROR, e);
         }
     }
 
@@ -169,7 +170,7 @@ public class PulsarProtobufNativeColumnDecoder {
             if (value instanceof Double || value instanceof Float) {
                 return ((Number) value).doubleValue();
             }
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -179,7 +180,7 @@ public class PulsarProtobufNativeColumnDecoder {
             if (value instanceof Boolean) {
                 return (Boolean) value;
             }
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -195,15 +196,16 @@ public class PulsarProtobufNativeColumnDecoder {
             }
 
             //return millisecond which parsed from protobuf/timestamp
-            if (columnType instanceof TimestampType && value instanceof DynamicMessage) {
+            if (TimestampType.TIMESTAMP_MILLIS.equals(columnType) && value instanceof DynamicMessage) {
                 DynamicMessage message = (DynamicMessage) value;
                 int nanos = (int) message.getField(message.getDescriptorForType().findFieldByName("nanos"));
                 long seconds = (long) message.getField(message.getDescriptorForType().findFieldByName("seconds"));
                 //maybe an exception here, but seems will never happen in hundred years.
-                return seconds * MILLIS_PER_SECOND + nanos / NANOS_PER_MILLISECOND;
+                long millis = seconds * MILLIS_PER_SECOND + nanos / NANOS_PER_MILLISECOND;
+                return millis * Timestamps.MICROSECONDS_PER_MILLISECOND;
             }
 
-            throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+            throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                     format("cannot decode object of '%s' as '%s' for column '%s'",
                             value.getClass(), columnType, columnName));
         }
@@ -233,7 +235,7 @@ public class PulsarProtobufNativeColumnDecoder {
             return truncateToLength(utf8Slice(value.toString()), type);
         }
 
-        throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+        throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
     }
@@ -308,7 +310,7 @@ public class PulsarProtobufNativeColumnDecoder {
             return;
         }
 
-        throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
+        throw new TrinoException(DECODER_CONVERSION_NOT_SUPPORTED,
                 format("cannot decode object of '%s' as '%s' for column '%s'",
                         value.getClass(), type, columnName));
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoder.java
index 6f28a5158e3..6b8235586a5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoder.java
@@ -20,14 +20,14 @@ package org.apache.pulsar.sql.presto.decoder.protobufnative;
 
 import static com.google.common.base.Functions.identity;
 import static com.google.common.collect.ImmutableMap.toImmutableMap;
-import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 import static java.util.Objects.requireNonNull;
 import com.google.protobuf.DynamicMessage;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -69,7 +69,7 @@ public class PulsarProtobufNativeRowDecoder implements PulsarRowDecoder {
             dynamicMessage = record.getProtobufRecord();
         } catch (Exception e) {
             log.error(e);
-            throw new PrestoException(GENERIC_INTERNAL_ERROR, "Decoding protobuf record failed.", e);
+            throw new TrinoException(GENERIC_INTERNAL_ERROR, "Decoding protobuf record failed.", e);
         }
         return Optional.of(columnDecoders.entrySet().stream()
                 .collect(toImmutableMap(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoderFactory.java
index e1afb0fb846..daecd5dd669 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoderFactory.java
@@ -19,30 +19,30 @@
 package org.apache.pulsar.sql.presto.decoder.protobufnative;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
 import static java.util.stream.Collectors.toList;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.TimestampProto;
 import io.airlift.log.Logger;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeManager;
-import io.prestosql.spi.type.TypeSignature;
-import io.prestosql.spi.type.TypeSignatureParameter;
-import io.prestosql.spi.type.VarbinaryType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.BooleanType;
+import io.trino.spi.type.DoubleType;
+import io.trino.spi.type.IntegerType;
+import io.trino.spi.type.RealType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.TimestampType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
+import io.trino.spi.type.TypeSignature;
+import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.VarbinaryType;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -60,7 +60,7 @@ import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
  */
 public class PulsarProtobufNativeRowDecoderFactory implements PulsarRowDecoderFactory {
 
-    private TypeManager typeManager;
+    private final TypeManager typeManager;
 
     public PulsarProtobufNativeRowDecoderFactory(TypeManager typeManager) {
         this.typeManager = typeManager;
@@ -79,7 +79,7 @@ public class PulsarProtobufNativeRowDecoderFactory implements PulsarRowDecoderFa
         List<ColumnMetadata> columnMetadata;
         String schemaJson = new String(schemaInfo.getSchema());
         if (StringUtils.isBlank(schemaJson)) {
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
         Descriptors.Descriptor schema;
@@ -89,7 +89,7 @@ public class PulsarProtobufNativeRowDecoderFactory implements PulsarRowDecoderFa
                             .getProtobufNativeSchema();
         } catch (Exception ex) {
             log.error(ex);
-            throw new PrestoException(NOT_SUPPORTED, "Topic "
+            throw new TrinoException(NOT_SUPPORTED, "Topic "
                     + topicName.toString() + " does not have a valid schema");
         }
 
@@ -149,7 +149,7 @@ public class PulsarProtobufNativeRowDecoderFactory implements PulsarRowDecoderFa
                 } else {
                     if (TimestampProto.getDescriptor().toProto().getName().equals(msg.getFile().toProto().getName())) {
                         //if msg type is protobuf/timestamp
-                        dataType = TimestampType.TIMESTAMP;
+                        dataType = TimestampType.TIMESTAMP_MILLIS;
                     } else {
                         //row
                         dataType = RowType.from(msg.getFields().stream()
diff --git a/pulsar-sql/presto-pulsar/src/main/resources/META-INF/services/io.prestosql.spi.Plugin b/pulsar-sql/presto-pulsar/src/main/resources/META-INF/services/io.trino.spi.Plugin
similarity index 100%
rename from pulsar-sql/presto-pulsar/src/main/resources/META-INF/services/io.prestosql.spi.Plugin
rename to pulsar-sql/presto-pulsar/src/main/resources/META-INF/services/io.trino.spi.Plugin
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java
index d4b3d755545..35809450dcc 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestCacheSizeAllocator.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.sql.presto;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
-import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.predicate.TupleDomain;
-import io.prestosql.testing.TestingConnectorContext;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.testing.TestingConnectorContext;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
@@ -58,7 +58,7 @@ public class TestCacheSizeAllocator extends MockedPulsarServiceBaseTest {
 
     @BeforeClass
     @Override
-    protected void setup() throws Exception {
+    public void setup() throws Exception {
         super.internalSetup();
 
         admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
@@ -72,12 +72,12 @@ public class TestCacheSizeAllocator extends MockedPulsarServiceBaseTest {
 
     @AfterClass
     @Override
-    protected void cleanup() throws Exception {
+    public void cleanup() throws Exception {
         super.internalCleanup();
     }
 
     @DataProvider(name = "cacheSizeProvider")
-    private Object[][] dataProvider() {
+    public Object[][] dataProvider() {
         return new Object[][] {
                 {-1}, {0}, {2000}, {5000}
         };
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
index e763a8e03e5..b2c7d2c2584 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -18,15 +18,15 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
-import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import com.google.common.collect.Sets;
 import io.jsonwebtoken.SignatureAlgorithm;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.ConnectorSession;
-import io.prestosql.spi.security.ConnectorIdentity;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.security.ConnectorIdentity;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Optional;
@@ -52,7 +52,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
 
     @BeforeClass
     @Override
-    protected void setup() throws Exception {
+    public void setup() throws Exception {
         conf.setAuthenticationEnabled(true);
         conf.setAuthenticationProviders(
                 Sets.newHashSet("org.apache.pulsar.broker.authentication.AuthenticationProviderToken"));
@@ -80,7 +80,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
 
     @AfterClass
     @Override
-    protected void cleanup() throws Exception {
+    public void cleanup() throws Exception {
         internalCleanup();
     }
 
@@ -112,7 +112,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
         try {
             pulsarAuth.checkTopicAuth(session, "test");
             Assert.fail(); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("The credential information is empty"));
         }
@@ -124,7 +124,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
         try {
             pulsarAuth.checkTopicAuth(session, "test");
             Assert.fail(); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("Please specify the auth-method and auth-params"));
         }
@@ -135,7 +135,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
         try {
             pulsarAuth.checkTopicAuth(session, "test");
             Assert.fail(); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("Please specify the auth-method and auth-params"));
         }
@@ -186,7 +186,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
         try {
             pulsarAuth.checkTopicAuth(session, otherTopic);
             Assert.fail(); // should fail
-        } catch (PrestoException e){
+        } catch (TrinoException e){
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("not authorized"));
         }
@@ -207,7 +207,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
             }}).when(identity).getExtraCredentials();
             pulsarAuth.checkTopicAuth(session, topic);
             Assert.fail(); // should fail
-        } catch (PrestoException e){
+        } catch (TrinoException e){
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("Unable to authenticate"));
         }
@@ -224,7 +224,7 @@ public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
             }}).when(identity).getExtraCredentials();
             pulsarAuth.checkTopicAuth(session, topic);
             Assert.fail(); // should fail
-        } catch (PrestoException e){
+        } catch (TrinoException e){
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
             Assert.assertTrue(e.getMessage().contains("not authorized"));
         }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index a03eb3f5a72..ad1c0f4b03d 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.sql.presto;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.predicate.TupleDomain;
-import io.prestosql.testing.TestingConnectorContext;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.testing.TestingConnectorContext;
 import java.math.BigDecimal;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -91,7 +91,7 @@ import static org.testng.Assert.assertNotNull;
 
 public abstract class TestPulsarConnector {
 
-    protected static final long currentTimeMs = 1534806330000L;
+    protected static final long currentTimeMicros = 1534806330000000L;
 
     protected PulsarConnectorConfig pulsarConnectorConfig;
 
@@ -405,7 +405,7 @@ public abstract class TestPulsarConnector {
 
             MessageMetadata messageMetadata = new MessageMetadata()
                     .setProducerName("test-producer").setSequenceId(i)
-                    .setPublishTime(currentTimeMs + i);
+                    .setPublishTime(currentTimeMicros / 1000 + i);
 
             Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 5335d8ad8f7..dfdee9360b4 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.connector.*;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.*;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
@@ -35,9 +35,9 @@ import javax.ws.rs.core.Response;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
-import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
-import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.trino.spi.StandardErrorCode.NOT_FOUND;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
 import static org.mockito.Mockito.*;
 import static org.testng.Assert.*;
 
@@ -142,7 +142,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
             fail("Invalid schema should have generated an exception");
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
             assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
         }
@@ -208,7 +208,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
             fail("Table without schema should have generated an exception");
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
             assertEquals(e.getMessage(),
                     "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
@@ -235,7 +235,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
             fail("Table without schema should have generated an exception");
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
             assertEquals(e.getMessage(),
                     "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
@@ -404,14 +404,14 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         // Test getColumnHandles should pass the auth check
         this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle);
 
-        doThrow(new PrestoException(PERMISSION_DENIED, "not authorized")).when(this.pulsarAuth)
+        doThrow(new TrinoException(PERMISSION_DENIED, "not authorized")).when(this.pulsarAuth)
                 .checkTopicAuth(isA(ConnectorSession.class), isA(String.class));
 
         // Test getTableHandle should fail the auth check
         try {
             this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
             Assert.fail("Test getTableHandle should fail the auth check"); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
         }
 
@@ -420,7 +420,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                     pulsarTableHandle);
             Assert.fail("Test getTableMetadata should fail the auth check"); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
         }
 
@@ -428,7 +428,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         try {
             this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle);
             Assert.fail("Test getColumnHandles should fail the auth check"); // should fail
-        } catch (PrestoException e) {
+        } catch (TrinoException e) {
             Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
         }
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 23dc69245f0..1aeb00d7469 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -21,11 +21,11 @@ package org.apache.pulsar.sql.presto;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.spi.predicate.TupleDomain;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarcharType;
 import java.math.BigDecimal;
 import lombok.Data;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index c7f6ce10759..0a7c76f632f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -22,14 +22,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.json.JsonCodec;
 import io.airlift.log.Logger;
-import io.prestosql.spi.connector.ColumnHandle;
-import io.prestosql.spi.connector.ConnectorSession;
-import io.prestosql.spi.connector.ConnectorSplitSource;
-import io.prestosql.spi.connector.ConnectorTransactionHandle;
-import io.prestosql.spi.predicate.Domain;
-import io.prestosql.spi.predicate.Range;
-import io.prestosql.spi.predicate.TupleDomain;
-import io.prestosql.spi.predicate.ValueSet;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitSource;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.predicate.Domain;
+import io.trino.spi.predicate.Range;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.predicate.ValueSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
@@ -50,8 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.TimestampType.TIMESTAMP;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -201,8 +201,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
 
         Map<ColumnHandle, Domain> domainMap = new HashMap<>();
-        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
-                currentTimeMs + 50L, true)), false);
+        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMicros + 1000L, true,
+                currentTimeMicros + 50000L, true)), false);
         domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
         TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
 
@@ -258,8 +258,8 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
 
         Map<ColumnHandle, Domain> domainMap = new HashMap<>();
-        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
-                currentTimeMs + 50L, true)), false);
+        Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMicros + 1000L, true,
+                currentTimeMicros + 50000L, true)), false);
         domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
         TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
index 0a02dc30825..7f4b9d365c7 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.sql.presto;
 
 import com.google.common.collect.Sets;
-import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.predicate.TupleDomain;
-import io.prestosql.testing.TestingConnectorContext;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.testing.TestingConnectorContext;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -74,7 +74,7 @@ public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
 
     @BeforeClass
     @Override
-    protected void setup() throws Exception {
+    public void setup() throws Exception {
         conf.setMaxMessageSize(MAX_MESSAGE_SIZE);
         conf.setManagedLedgerMaxEntriesPerLedger(5);
         conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
@@ -91,7 +91,7 @@ public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
 
     @AfterClass
     @Override
-    protected void cleanup() throws Exception {
+    public void cleanup() throws Exception {
         internalCleanup();
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
index 005ff8cd703..eac59ffcd81 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
@@ -19,13 +19,13 @@
 package org.apache.pulsar.sql.presto.decoder;
 
 import io.airlift.slice.Slice;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.connector.ColumnMetadata;
-import io.prestosql.spi.connector.ConnectorContext;
-import io.prestosql.spi.type.Type;
-import io.prestosql.testing.TestingConnectorContext;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.type.Type;
+import io.trino.testing.TestingConnectorContext;
 import java.math.BigDecimal;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
index 496a6f061bf..22a8f56cd28 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/DecoderTestUtil.java
@@ -19,21 +19,21 @@
 package org.apache.pulsar.sql.presto.decoder;
 
 import io.airlift.slice.Slice;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.Decimals;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.Type;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Decimals;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.Type;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Map;
 
-import static io.prestosql.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH;
-import static io.prestosql.testing.TestingConnectorSession.SESSION;
+import static io.trino.spi.type.UnscaledDecimal128Arithmetic.UNSCALED_DECIMAL_128_SLICE_LENGTH;
+import static io.trino.testing.TestingConnectorSession.SESSION;
 import static org.testng.Assert.*;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/AvroDecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/AvroDecoderTestUtil.java
index 521807f80f0..398957f5746 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/AvroDecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/AvroDecoderTestUtil.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.sql.presto.decoder.avro;
 
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.type.*;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.*;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
+import static io.trino.spi.type.VarcharType.VARCHAR;
 import static java.lang.String.format;
 import static org.testng.Assert.*;
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 2478300dcaa..cb153236ed6 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -20,17 +20,18 @@ package org.apache.pulsar.sql.presto.decoder.avro;
 
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeSignatureParameter;
-import io.prestosql.spi.type.VarcharType;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.VarcharType;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -51,12 +52,14 @@ import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 
-import static io.prestosql.spi.type.BigintType.BIGINT;
-import static io.prestosql.spi.type.BooleanType.BOOLEAN;
-import static io.prestosql.spi.type.DoubleType.DOUBLE;
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.RealType.REAL;
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.TimeType.TIME_MILLIS;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+import static io.trino.spi.type.VarcharType.VARCHAR;
 import static java.lang.Float.floatToIntBits;
 import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
 import static org.testng.Assert.assertEquals;
@@ -128,6 +131,14 @@ public class TestAvroDecoder extends AbstractDecoderTester {
         PulsarColumnHandle enumFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
                 "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
+
+        PulsarColumnHandle timestampFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "timestampField", TIMESTAMP_MILLIS, false, false, "timestampField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, timestampFieldColumnHandle, message.timestampField * Timestamps.MICROSECONDS_PER_MILLISECOND);
+
+        PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
     }
 
     @Test
@@ -306,7 +317,7 @@ public class TestAvroDecoder extends AbstractDecoderTester {
     @Test(singleThreaded = true)
     public void testCyclicDefinitionDetect() {
         AvroSchema cyclicSchema = AvroSchema.of(DecoderTestMessage.CyclicFoo.class);
-        PrestoException exception = expectThrows(PrestoException.class,
+        TrinoException exception = expectThrows(TrinoException.class,
                 () -> {
                     decoderFactory.extractColumnMetadata(topicName, cyclicSchema.getSchemaInfo(),
                             PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/JsonDecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/JsonDecoderTestUtil.java
index f522a787b9f..c5395f21422 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/JsonDecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/JsonDecoderTestUtil.java
@@ -22,15 +22,15 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Iterators;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.type.*;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.*;
 import org.apache.pulsar.sql.presto.decoder.DecoderTestUtil;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
+import static io.trino.spi.type.VarcharType.VARCHAR;
 import static java.lang.String.format;
 import static org.testng.Assert.*;
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 0b8a8f84eda..2a60260557a 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -18,13 +18,43 @@
  */
 package org.apache.pulsar.sql.presto.decoder.json;
 
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.TimeType.TIME_MILLIS;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.Float.floatToIntBits;
+import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.type.*;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.BigintType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeSignatureParameter;
+import io.trino.spi.type.VarcharType;
 import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -34,24 +64,6 @@ import org.apache.pulsar.sql.presto.decoder.DecoderTestMessage;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.ZoneId;
-import java.time.temporal.ChronoUnit;
-import java.util.*;
-
-import static io.prestosql.spi.type.BigintType.BIGINT;
-import static io.prestosql.spi.type.BooleanType.BOOLEAN;
-import static io.prestosql.spi.type.DoubleType.DOUBLE;
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.RealType.REAL;
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
-import static java.lang.Float.floatToIntBits;
-import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.expectThrows;
-
 public class TestJsonDecoder extends AbstractDecoderTester {
 
     private JSONSchema schema;
@@ -118,6 +130,13 @@ public class TestJsonDecoder extends AbstractDecoderTester {
                 "enumField", VARCHAR, false, false, "enumField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
         checkValue(decodedRow, enumFieldColumnHandle, message.enumField.toString());
 
+        PulsarColumnHandle timestampFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "timestampField", TIMESTAMP_MILLIS, false, false, "timestampField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, timestampFieldColumnHandle, message.timestampField * Timestamps.MICROSECONDS_PER_MILLISECOND);
+
+        PulsarColumnHandle timeFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
+                "timeField", TIME_MILLIS, false, false, "timeField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+        checkValue(decodedRow, timeFieldColumnHandle, (long) message.timeField * Timestamps.PICOSECONDS_PER_MILLISECOND);
     }
 
     @Test
@@ -290,7 +309,7 @@ public class TestJsonDecoder extends AbstractDecoderTester {
     @Test(singleThreaded = true)
     public void testCyclicDefinitionDetect() {
         JSONSchema cyclicSchema = JSONSchema.of(DecoderTestMessage.CyclicFoo.class);
-        PrestoException exception = expectThrows(PrestoException.class,
+        TrinoException exception = expectThrows(TrinoException.class,
                 () -> {
                     decoderFactory.extractColumnMetadata(topicName, cyclicSchema.getSchemaInfo(),
                             PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/PrimitiveDecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/PrimitiveDecoderTestUtil.java
index 53813d5707d..81a480a3b84 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/PrimitiveDecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/PrimitiveDecoderTestUtil.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.sql.presto.decoder.primitive;
 
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.type.Type;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.Type;
 import org.apache.pulsar.sql.presto.decoder.DecoderTestUtil;
 
 /**
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
index 61142a1bfa2..54b11cebeb2 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
@@ -18,9 +18,31 @@
  */
 package org.apache.pulsar.sql.presto.decoder.primitive;
 
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DateType.DATE;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimeType.TIME_MILLIS;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
 import io.airlift.slice.Slices;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.type.Timestamps;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -30,27 +52,6 @@ import org.apache.pulsar.sql.presto.decoder.AbstractDecoderTester;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static io.prestosql.spi.type.DateType.DATE;
-import static io.prestosql.spi.type.SmallintType.SMALLINT;
-import static io.prestosql.spi.type.TimeType.TIME;
-import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
-import static io.prestosql.spi.type.TinyintType.TINYINT;
-import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
-import static io.prestosql.spi.type.BigintType.BIGINT;
-import static io.prestosql.spi.type.BooleanType.BOOLEAN;
-import static io.prestosql.spi.type.DoubleType.DOUBLE;
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.RealType.REAL;
-import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
-
 public class TestPrimitiveDecoder extends AbstractDecoderTester {
 
     public static final String PRIMITIVE_COLUMN_NAME = "__value__";
@@ -63,7 +64,6 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
 
     @Test(singleThreaded = true)
     public void testPrimitiveType() {
-
         byte int8Value = 1;
         SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
         Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
@@ -200,7 +200,8 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, DATE, false, false, PRIMITIVE_COLUMN_NAME, null, null,
                 PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
 
-        Time timeValue = new Time(System.currentTimeMillis());
+        LocalTime now = LocalTime.now(ZoneId.systemDefault());
+        Time timeValue = Time.valueOf(now);
         SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
         Schema schemaTime = Schema.getSchema(schemaInfoTime);
         List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
@@ -211,8 +212,8 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 pulsarRowDecoderTime.decodeRow(io.netty.buffer.Unpooled
                         .copiedBuffer(schemaTime.encode(timeValue))).get();
         checkValue(decodedRowTime, new PulsarColumnHandle(getPulsarConnectorId().toString(),
-                PRIMITIVE_COLUMN_NAME, TIME, false, false, PRIMITIVE_COLUMN_NAME, null, null,
-                PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
+                PRIMITIVE_COLUMN_NAME, TIME_MILLIS, false, false, PRIMITIVE_COLUMN_NAME, null, null,
+                PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime() * Timestamps.PICOSECONDS_PER_MILLISECOND);
 
         Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
         SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
@@ -225,8 +226,8 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 pulsarRowDecoderTimestamp.decodeRow(io.netty.buffer.Unpooled
                         .copiedBuffer(schemaTimestamp.encode(timestampValue))).get();
         checkValue(decodedRowTimestamp, new PulsarColumnHandle(getPulsarConnectorId().toString(),
-                PRIMITIVE_COLUMN_NAME, TIMESTAMP, false, false, PRIMITIVE_COLUMN_NAME, null, null,
-                PulsarColumnHandle.HandleKeyValueType.NONE), timestampValue.getTime());
+                PRIMITIVE_COLUMN_NAME, TIMESTAMP_MILLIS, false, false, PRIMITIVE_COLUMN_NAME, null, null,
+                PulsarColumnHandle.HandleKeyValueType.NONE), timestampValue.getTime() * Timestamps.MICROSECONDS_PER_MILLISECOND);
 
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/ProtobufNativeDecoderTestUtil.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/ProtobufNativeDecoderTestUtil.java
index 1a491720c59..191dad14c61 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/ProtobufNativeDecoderTestUtil.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/ProtobufNativeDecoderTestUtil.java
@@ -21,12 +21,12 @@ package org.apache.pulsar.sql.presto.decoder.protobufnative;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.EnumValue;
-import io.prestosql.spi.block.Block;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.SqlVarbinary;
-import io.prestosql.spi.type.Type;
+import io.trino.spi.block.Block;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.MapType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.SqlVarbinary;
+import io.trino.spi.type.Type;
 import org.apache.pulsar.sql.presto.decoder.DecoderTestUtil;
 
 import java.util.List;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/TestProtobufNativeDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/TestProtobufNativeDecoder.java
index c4c6cb2e6f3..69acc6e6564 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/TestProtobufNativeDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/protobufnative/TestProtobufNativeDecoder.java
@@ -18,17 +18,29 @@
  */
 package org.apache.pulsar.sql.presto.decoder.protobufnative;
 
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
+import static org.testng.Assert.assertTrue;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import io.netty.buffer.ByteBuf;
-import io.prestosql.decoder.DecoderColumnHandle;
-import io.prestosql.decoder.FieldValueProvider;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeSignatureParameter;
+import io.trino.decoder.DecoderColumnHandle;
+import io.trino.decoder.FieldValueProvider;
+import io.trino.spi.type.ArrayType;
+import io.trino.spi.type.RowType;
+import io.trino.spi.type.StandardTypes;
+import io.trino.spi.type.Timestamps;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeSignatureParameter;
+import java.util.HashSet;
+import java.util.Map;
 import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeRecord;
 import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
@@ -37,19 +49,6 @@ import org.apache.pulsar.sql.presto.decoder.AbstractDecoderTester;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.HashSet;
-import java.util.Map;
-
-import static io.prestosql.spi.type.BigintType.BIGINT;
-import static io.prestosql.spi.type.BooleanType.BOOLEAN;
-import static io.prestosql.spi.type.DoubleType.DOUBLE;
-import static io.prestosql.spi.type.IntegerType.INTEGER;
-import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
-import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
-import static io.prestosql.spi.type.VarcharType.VARCHAR;
-import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
-import static org.testng.Assert.assertTrue;
-
 public class TestProtobufNativeDecoder extends AbstractDecoderTester {
 
     private ProtobufNativeSchema schema;
@@ -173,9 +172,9 @@ public class TestProtobufNativeDecoder extends AbstractDecoderTester {
         checkValue(decodedRow, enumFieldColumnHandle, testMessage.getTestEnum().name());
 
         PulsarColumnHandle timestampFieldColumnHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(),
-                "timestampField", TIMESTAMP,false,false,"timestampField",null,null,
+                "timestampField", TIMESTAMP_MILLIS,false,false,"timestampField",null,null,
                 PulsarColumnHandle.HandleKeyValueType.NONE);
-        checkValue(decodedRow, timestampFieldColumnHandle, mills);
+        checkValue(decodedRow, timestampFieldColumnHandle, mills * Timestamps.MICROSECONDS_PER_MILLISECOND);
 
     }
 
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 477242799d4..c491d005fda 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -178,9 +178,9 @@
     </dependency>
 
     <dependency>
-      <groupId>io.prestosql</groupId>
-      <artifactId>presto-jdbc</artifactId>
-      <version>${presto.version}</version>
+      <groupId>io.trino</groupId>
+      <artifactId>trino-jdbc</artifactId>
+      <version>${trino.version}</version>
       <scope>test</scope>
     </dependency>
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index c15a455b1ae..b85ce455b00 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -112,6 +112,7 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
                 && pulsarCluster.getSqlFollowWorkerContainers().size() > 0) {
             OkHttpClient okHttpClient = new OkHttpClient();
             Request request = new Request.Builder()
+                    .header("X-Trino-User", "test-user")
                     .url("http://" + pulsarCluster.getPrestoWorkerContainer().getUrl() + "/v1/node")
                     .build();
             do {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
index 902de4dd4ad..8c72ef8f946 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java
@@ -82,7 +82,7 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
             log.error("The presto work container isn't exist.");
             return;
         }
-        String url = String.format("jdbc:presto://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
+        String url = String.format("jdbc:trino://%s",  pulsarCluster.getPrestoWorkerContainer().getUrl());
         connection = DriverManager.getConnection(url, "test", null);
     }