You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/01 08:59:57 UTC

[pulsar] branch master updated: [feature][sql] Add Pulsar Auth support for the Pulsar SQL (#15571)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97fff81b2fc [feature][sql] Add Pulsar Auth support for the Pulsar SQL (#15571)
97fff81b2fc is described below

commit 97fff81b2fccd0b640c7b31a882e4c9519192380
Author: Zike Yang <zi...@apache.org>
AuthorDate: Wed Jun 1 16:59:50 2022 +0800

    [feature][sql] Add Pulsar Auth support for the Pulsar SQL (#15571)
    
    ### Motivation
    
    Currently, pulsar SQL does not compatible with the authentication and authorization from the pulsar broker. We don't have any security-related integration between Pulsar and Presto.
    
    Although we can implement a presto access control plugin to interface with the pulsar broker's authorization module. But this method does not handle authentication well.
    
    ### Modifications
    
    This PR adds authentication and authorization support between Pulsar and Pulsar SQL by using the extra credentials properties. The request from the pulsar SQL client can have auth-related parameters(like `auth-plugin` and `auth-params`) attached to the extra credentials. The pulsar SQL worker can therefore obtain auth information and use PulsarClient to initiate an auth verification request to the broker. In this way, we can invoke the broker's authentication and authorization module  [...]
    
    This PR adds a new class `PulsarAuth` to the SQL worker. This class implements the authentication and authorization integration between the Pulsar SQL worker and the Pulsar broker. It will check permissions against the session-topic pair by trying to subscribe to a topic using the Pulsar Reader to check the consumption privilege. The same topic will only be checked once during the same session.
    
    Regarding the compatibility, this PR is a non-intrusive change, and the user can decide whether to enable this feature via the `pulsar.authorization-enable` in the `pulsar.properties` file.
---
 conf/presto/catalog/pulsar.properties              |  11 +-
 pulsar-sql/presto-pulsar/pom.xml                   |   7 +
 .../org/apache/pulsar/sql/presto/PulsarAuth.java   | 134 +++++++++++
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  21 ++
 .../pulsar/sql/presto/PulsarConnectorModule.java   |   1 +
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  33 ++-
 .../apache/pulsar/sql/presto/TestPulsarAuth.java   | 250 +++++++++++++++++++++
 .../pulsar/sql/presto/TestPulsarConnector.java     |  12 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  58 +++++
 .../sql/presto/decoder/AbstractDecoderTester.java  |   4 +-
 .../integration/presto/TestPulsarSQLAuth.java      | 227 +++++++++++++++++++
 .../integration/presto/TestPulsarSQLBase.java      |  22 +-
 .../integration/src/test/resources/pulsar-sql.xml  |   1 +
 13 files changed, 767 insertions(+), 14 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index e273b98dccc..8cbea0e1364 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -24,6 +24,8 @@ connector.name=pulsar
 pulsar.broker-service-url=http://localhost:8080
 # the url of Pulsar broker web service
 pulsar.web-service-url=http://localhost:8080
+# the url of Pulsar broker binary service
+pulsar.broker-binary-service-url=pulsar://localhost:6650
 # URI of Zookeeper cluster
 pulsar.zookeeper-uri=127.0.0.1:2181
 # minimum number of entries to read at a single time
@@ -66,10 +68,10 @@ pulsar.rewrite-namespace-delimiter=/
 ####### AUTHENTICATION CONFIGS #######
 
 ## the authentication plugin to be used to authenticate to Pulsar cluster
-#pulsar.auth-plugin =
+#pulsar.auth-plugin=
 
 ## the authentication parameter to be used to authenticate to Pulsar cluster
-#pulsar.auth-params =
+#pulsar.auth-params=
 
 ## Accept untrusted TLS certificate
 #pulsar.tls-allow-insecure-connection =
@@ -80,6 +82,11 @@ pulsar.rewrite-namespace-delimiter=/
 ## Path for the trusted TLS certificate file
 #pulsar.tls-trust-cert-file-path =
 
+####### PULSAR AUTHORIZATION CONFIGS #######
+
+## Whether to enable pulsar authorization
+pulsar.authorization-enabled=false
+
 ####### BOOKKEEPER CONFIGS #######
 
 # Entries read count throttling-limit per seconds, 0 is represents disable the throttle, default is 0.
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index b9749df772f..91548128624 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -110,6 +110,13 @@
             <version>${javax.annotation-api.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.jsonwebtoken</groupId>
+            <artifactId>jjwt-impl</artifactId>
+            <version>${jsonwebtoken.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.prestosql</groupId>
             <artifactId>presto-main</artifactId>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
new file mode 100644
index 00000000000..b94e4a27611
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+/**
+ * This class implements the authentication and authorization integration between the Pulsar SQL worker and the
+ * Pulsar broker.
+ *
+ * It will check permissions against the session-topic pair by trying to subscribe to a topic using the Pulsar Reader
+ * to check the consumption privilege. The same topic will only be checked once during the same session.
+ */
+public class PulsarAuth {
+
+    private static final Logger log = Logger.get(PulsarAuth.class);
+
+    private final PulsarConnectorConfig pulsarConnectorConfig;
+    private static final String CREDENTIALS_AUTH_PLUGIN = "auth-plugin";
+    private static final String CREDENTIALS_AUTH_PARAMS = "auth-params";
+    @VisibleForTesting
+    final Map<String, Set<String>> authorizedQueryTopicsMap = new ConcurrentHashMap<>();
+
+    @Inject
+    public PulsarAuth(PulsarConnectorConfig pulsarConnectorConfig) {
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
+        if (pulsarConnectorConfig.getAuthorizationEnabled() && StringUtils.isEmpty(
+                pulsarConnectorConfig.getBrokerBinaryServiceUrl())) {
+            throw new IllegalArgumentException(
+                    "pulsar.broker-binary-service-url must be present when the pulsar.authorization-enable is true.");
+        }
+    }
+
+    /**
+     * Check if the session has read access to the topic.
+     * It will try to subscribe to that topic using the Pulsar Reader to check the consumption privilege.
+     * The same topic will only be checked once during the same session.
+     */
+    public void checkTopicAuth(ConnectorSession session, String topic) {
+        Set<String> authorizedTopics =
+                authorizedQueryTopicsMap.computeIfAbsent(session.getQueryId(), query -> new HashSet<>());
+        if (authorizedTopics.contains(topic)) {
+            if (log.isDebugEnabled()) {
+                log.debug("The topic %s is already authorized.", topic);
+            }
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Checking the authorization for the topic: %s", topic);
+        }
+        Map<String, String> extraCredentials = session.getIdentity().getExtraCredentials();
+        if (extraCredentials.isEmpty()) { // the extraCredentials won't be null
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: The credential information is empty.",
+                            topic));
+        }
+        String authMethod = extraCredentials.get(CREDENTIALS_AUTH_PLUGIN);
+        String authParams = extraCredentials.get(CREDENTIALS_AUTH_PARAMS);
+        if (StringUtils.isEmpty(authMethod) || StringUtils.isEmpty(authParams)) {
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: Required credential parameters are "
+                                    + "missing. Please specify the auth-method and auth-params in the extra "
+                                    + "credentials.",
+                            topic));
+        }
+        try {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarConnectorConfig.getBrokerBinaryServiceUrl())
+                    .authentication(authMethod, authParams)
+                    .build();
+            client.newConsumer().topic(topic)
+                    .subscriptionName("pulsar-sql-auth" + session.getQueryId())
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionMode(SubscriptionMode.NonDurable)
+                    .startPaused(true)
+                    .subscribe()
+                    .close();
+            authorizedQueryTopicsMap.computeIfPresent(session.getQueryId(), (query, topics) -> {
+                topics.add(topic);
+                return topics;
+            });
+            if (log.isDebugEnabled()) {
+                log.debug("Check the authorization for the topic %s successfully.", topic);
+            }
+        } catch (PulsarClientException.AuthenticationException | PulsarClientException.AuthorizationException e) {
+            throw new PrestoException(PERMISSION_DENIED,
+                    String.format("Failed to access topic %s: %s", topic, e.getLocalizedMessage()));
+        } catch (IOException e) {
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format("Failed to check authorization for topic %s: %s", topic, e.getLocalizedMessage()));
+        }
+    }
+
+    /**
+     * When the session is closed, this method needs to be called to clear the session's auth verification status.
+     */
+    public void cleanSession(ConnectorSession session) {
+        authorizedQueryTopicsMap.remove(session.getQueryId());
+    }
+}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 0e5c2b4e95f..2bd2570d54d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.protocol.Commands;
 public class PulsarConnectorConfig implements AutoCloseable {
 
     private String brokerServiceUrl = "http://localhost:8080";
+    private String brokerBinaryServiceUrl = "pulsar://localhost:6650/";
     private String webServiceUrl = ""; //leave empty
     private String zookeeperUri = "localhost:2181";
     private int entryReadBatchSize = 100;
@@ -63,6 +64,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private boolean namespaceDelimiterRewriteEnable = false;
     private String rewriteNamespaceDelimiter = "/";
 
+    private boolean authorizationEnabled = false;
+
     // --- Ledger Offloading ---
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = 2;
@@ -103,6 +106,14 @@ public class PulsarConnectorConfig implements AutoCloseable {
         this.brokerServiceUrl = brokerServiceUrl;
         return this;
     }
+    public String getBrokerBinaryServiceUrl() {
+        return this.brokerBinaryServiceUrl;
+    }
+    @Config("pulsar.broker-binary-service-url")
+    public PulsarConnectorConfig setBrokerBinaryServiceUrl(String brokerBinaryServiceUrl) {
+        this.brokerBinaryServiceUrl = brokerBinaryServiceUrl;
+        return this;
+    }
     @Config("pulsar.web-service-url")
     public PulsarConnectorConfig setWebServiceUrl(String webServiceUrl) {
         this.webServiceUrl = webServiceUrl;
@@ -238,6 +249,16 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    public boolean getAuthorizationEnabled() {
+        return authorizationEnabled;
+    }
+
+    @Config("pulsar.authorization-enabled")
+    public PulsarConnectorConfig setAuthorizationEnabled(boolean authorizationEnabled) {
+        this.authorizationEnabled = authorizationEnabled;
+        return this;
+    }
+
     // --- Ledger Offloading ---
 
     public int getManagedLedgerOffloadMaxThreads() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
index 09e517a4492..f5cf78e4153 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java
@@ -55,6 +55,7 @@ public class PulsarConnectorModule implements Module {
         binder.bind(PulsarMetadata.class).in(Scopes.SINGLETON);
         binder.bind(PulsarSplitManager.class).in(Scopes.SINGLETON);
         binder.bind(PulsarRecordSetProvider.class).in(Scopes.SINGLETON);
+        binder.bind(PulsarAuth.class).in(Scopes.SINGLETON);
 
         binder.bind(PulsarDispatchingRowDecoderFactory.class).in(Scopes.SINGLETON);
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 411261430a0..6a260cc9b14 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -76,6 +76,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     private final PulsarConnectorConfig pulsarConnectorConfig;
 
     private final PulsarDispatchingRowDecoderFactory decoderFactory;
+    private final PulsarAuth pulsarAuth;
 
     private static final String INFORMATION_SCHEMA = "information_schema";
 
@@ -95,10 +96,11 @@ public class PulsarMetadata implements ConnectorMetadata {
 
     @Inject
     public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig,
-                          PulsarDispatchingRowDecoderFactory decoderFactory) {
+                          PulsarDispatchingRowDecoderFactory decoderFactory, PulsarAuth pulsarAuth) {
         this.decoderFactory = decoderFactory;
         this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
         this.pulsarConnectorConfig = pulsarConnectorConfig;
+        this.pulsarAuth = pulsarAuth;
         try {
             this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
         } catch (PulsarClientException e) {
@@ -128,6 +130,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     @Override
     public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
         TopicName topicName = getMatchedTopicName(tableName);
+        checkTopicAuthorization(session, topicName.toString());
         return new PulsarTableHandle(
                 this.connectorId,
                 tableName.getSchemaName(),
@@ -155,7 +158,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
         ConnectorTableMetadata connectorTableMetadata;
         SchemaTableName schemaTableName = convertTableHandle(table).toSchemaTableName();
-        connectorTableMetadata = getTableMetadata(schemaTableName, true);
+        connectorTableMetadata = getTableMetadata(session, schemaTableName, true);
         if (connectorTableMetadata == null) {
             ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
             connectorTableMetadata = new ConnectorTableMetadata(schemaTableName, builder.build());
@@ -205,7 +208,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
         PulsarTableHandle pulsarTableHandle = convertTableHandle(tableHandle);
 
-        ConnectorTableMetadata tableMetaData = getTableMetadata(pulsarTableHandle.toSchemaTableName(), false);
+        ConnectorTableMetadata tableMetaData = getTableMetadata(session, pulsarTableHandle.toSchemaTableName(), false);
         if (tableMetaData == null) {
             return new HashMap<>();
         }
@@ -243,8 +246,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     @Override
     public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle
             columnHandle) {
-
-        convertTableHandle(tableHandle);
+        PulsarTableHandle handle = convertTableHandle(tableHandle);
         return convertColumnHandle(columnHandle).getColumnMetadata();
     }
 
@@ -264,7 +266,7 @@ public class PulsarMetadata implements ConnectorMetadata {
         }
 
         for (SchemaTableName tableName : tableNames) {
-            ConnectorTableMetadata connectorTableMetadata = getTableMetadata(tableName, true);
+            ConnectorTableMetadata connectorTableMetadata = getTableMetadata(session, tableName, true);
             if (connectorTableMetadata != null) {
                 columns.put(tableName, connectorTableMetadata.getColumns());
             }
@@ -273,7 +275,15 @@ public class PulsarMetadata implements ConnectorMetadata {
         return columns.build();
     }
 
-    private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
+    @Override
+    public void cleanupQuery(ConnectorSession session) {
+        if (pulsarConnectorConfig.getAuthorizationEnabled()) {
+            pulsarAuth.cleanSession(session);
+        }
+    }
+
+    private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName,
+                                                    boolean withInternalColumns) {
 
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
@@ -281,6 +291,8 @@ public class PulsarMetadata implements ConnectorMetadata {
 
         TopicName topicName = getMatchedTopicName(schemaTableName);
 
+        checkTopicAuthorization(session, topicName.toString());
+
         SchemaInfo schemaInfo;
         try {
             schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(topicName.getSchemaName());
@@ -412,4 +424,11 @@ public class PulsarMetadata implements ConnectorMetadata {
         return TopicName.get(matchedTopics.get(0));
     }
 
+    void checkTopicAuthorization(ConnectorSession session, String topic) {
+        if (!pulsarConnectorConfig.getAuthorizationEnabled()) {
+            return;
+        }
+        pulsarAuth.checkTopicAuth(session, topic);
+    }
+
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
new file mode 100644
index 00000000000..e763a8e03e5
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarAuth.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.security.ConnectorIdentity;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import javax.crypto.SecretKey;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestPulsarAuth extends MockedPulsarServiceBaseTest {
+    private SecretKey secretKey;
+    private final String SUPER_USER_ROLE = "admin";
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(
+                Sets.newHashSet("org.apache.pulsar.broker.authentication.AuthenticationProviderToken"));
+        conf.setAuthorizationEnabled(true);
+        secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+        conf.setProperties(properties);
+        conf.setSuperUserRoles(Sets.newHashSet(SUPER_USER_ROLE));
+        conf.setClusterName("c1");
+        internalSetup();
+
+        admin.clusters().createCluster("c1", ClusterData.builder().build());
+        admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet(SUPER_USER_ROLE), Sets.newHashSet("c1")));
+        waitForChange();
+        admin.namespaces().createNamespace("p1/c1/ns1");
+        waitForChange();
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        pulsarAdminBuilder.authentication(
+                AuthenticationFactory.token(AuthTokenUtils.createToken(secretKey, SUPER_USER_ROLE, Optional.empty())));
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConfigCheck() {
+        PulsarConnectorConfig pulsarConnectorConfig = new PulsarConnectorConfig();
+        pulsarConnectorConfig.setAuthorizationEnabled(true);
+        pulsarConnectorConfig.setBrokerBinaryServiceUrl("");
+
+        new PulsarAuth(pulsarConnectorConfig);
+    }
+
+    @Test
+    public void testEmptyExtraCredentials() {
+        PulsarConnectorConfig pulsarConnectorConfig = mock(PulsarConnectorConfig.class);
+
+        doReturn(true).when(pulsarConnectorConfig).getAuthorizationEnabled();
+        doReturn(pulsar.getBrokerServiceUrl()).when(pulsarConnectorConfig).getBrokerBinaryServiceUrl();
+
+        PulsarAuth pulsarAuth = new PulsarAuth(pulsarConnectorConfig);
+
+        ConnectorSession session = mock(ConnectorSession.class);
+        ConnectorIdentity identity = mock(ConnectorIdentity.class);
+        doReturn("query-1").when(session).getQueryId();
+        doReturn(identity).when(session).getIdentity();
+
+        // Test empty extra credentials map
+        doReturn(new HashMap<String, String>()).when(identity).getExtraCredentials();
+        try {
+            pulsarAuth.checkTopicAuth(session, "test");
+            Assert.fail(); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("The credential information is empty"));
+        }
+
+        // Test empty extra credentials parameters
+        doReturn(new HashMap<String, String>() {{
+            put("auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+        }}).when(identity).getExtraCredentials();
+        try {
+            pulsarAuth.checkTopicAuth(session, "test");
+            Assert.fail(); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("Please specify the auth-method and auth-params"));
+        }
+
+        doReturn(new HashMap<String, String>() {{
+            put("auth-params", "test-token");
+        }}).when(identity).getExtraCredentials();
+        try {
+            pulsarAuth.checkTopicAuth(session, "test");
+            Assert.fail(); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(QUERY_REJECTED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("Please specify the auth-method and auth-params"));
+        }
+    }
+
+    @Test
+    public void testPulsarSqlAuth() throws PulsarAdminException {
+        String passRole = RandomStringUtils.randomAlphabetic(4) + "-pass";
+        String deniedRole = RandomStringUtils.randomAlphabetic(4) + "-denied";
+        String topic = "persistent://p1/c1/ns1/" + RandomStringUtils.randomAlphabetic(4);
+        String otherTopic = "persistent://p1/c1/ns1/" + RandomStringUtils.randomAlphabetic(4) + "-other";
+        String partitionedTopic = "persistent://p1/c1/ns1/" + RandomStringUtils.randomAlphabetic(4);
+        String passToken = AuthTokenUtils.createToken(secretKey, passRole, Optional.empty());
+        String deniedToken = AuthTokenUtils.createToken(secretKey, deniedRole, Optional.empty());
+
+        admin.topics().grantPermission(topic, passRole, EnumSet.of(AuthAction.consume));
+        admin.topics().createPartitionedTopic(partitionedTopic, 2);
+        admin.topics().grantPermission(partitionedTopic, passRole, EnumSet.of(AuthAction.consume));
+        waitForChange();
+
+        ConnectorSession session = mock(ConnectorSession.class);
+        ConnectorIdentity identity = mock(ConnectorIdentity.class);
+        PulsarConnectorConfig pulsarConnectorConfig = mock(PulsarConnectorConfig.class);
+
+        doReturn(true).when(pulsarConnectorConfig).getAuthorizationEnabled();
+        doReturn(pulsar.getBrokerServiceUrl()).when(pulsarConnectorConfig).getBrokerBinaryServiceUrl();
+
+        doReturn("query-1").when(session).getQueryId();
+        doReturn(identity).when(session).getIdentity();
+
+        doReturn(new HashMap<String, String>() {{
+            put("auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+            put("auth-params", passToken);
+        }}).when(identity).getExtraCredentials();
+
+        PulsarAuth pulsarAuth = new PulsarAuth(pulsarConnectorConfig);
+
+        pulsarAuth.checkTopicAuth(session, topic); // should pass
+
+        // authorizedQueryTopicPairs should contain the authorized query and topic.
+        Assert.assertTrue(
+                pulsarAuth.authorizedQueryTopicsMap.containsKey(session.getQueryId()));
+        Assert.assertTrue(pulsarAuth.authorizedQueryTopicsMap.get(session.getQueryId()).contains(topic));
+
+        // Using the authorized query but not authorized topic should fail.
+        // This part of the test case is for the case where a query accesses multiple topics but only some of them
+        // have permission.
+        try {
+            pulsarAuth.checkTopicAuth(session, otherTopic);
+            Assert.fail(); // should fail
+        } catch (PrestoException e){
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("not authorized"));
+        }
+
+        // test clean session
+        pulsarAuth.cleanSession(session);
+
+        Assert.assertFalse(pulsarAuth.authorizedQueryTopicsMap.containsKey(session.getQueryId()));
+
+        doReturn("test-fail").when(session).getQueryId();
+
+        doReturn("query-2").when(session).getQueryId();
+
+        try{
+            doReturn(new HashMap<String, String>() {{
+                put("auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                put("auth-params", "invalid-token");
+            }}).when(identity).getExtraCredentials();
+            pulsarAuth.checkTopicAuth(session, topic);
+            Assert.fail(); // should fail
+        } catch (PrestoException e){
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("Unable to authenticate"));
+        }
+
+        pulsarAuth.cleanSession(session);
+        Assert.assertTrue(pulsarAuth.authorizedQueryTopicsMap.isEmpty());
+
+        doReturn("query-3").when(session).getQueryId();
+
+        try{
+            doReturn(new HashMap<String, String>() {{
+                put("auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                put("auth-params", deniedToken);
+            }}).when(identity).getExtraCredentials();
+            pulsarAuth.checkTopicAuth(session, topic);
+            Assert.fail(); // should fail
+        } catch (PrestoException e){
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+            Assert.assertTrue(e.getMessage().contains("not authorized"));
+        }
+
+        pulsarAuth.cleanSession(session);
+
+        doReturn(new HashMap<String, String>() {{
+            put("auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+            put("auth-params", passToken);
+        }}).when(identity).getExtraCredentials();
+        pulsarAuth.checkTopicAuth(session, topic); // should pass for the partitioned topic case
+
+        pulsarAuth.cleanSession(session);
+        Assert.assertTrue(pulsarAuth.authorizedQueryTopicsMap.isEmpty());
+    }
+
+    private static void waitForChange() {
+        try {
+            Thread.sleep(100);
+        } catch (InterruptedException ignored) {
+        }
+    }
+}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7db32f59148..fc13647e905 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -97,6 +97,8 @@ public abstract class TestPulsarConnector {
 
     protected PulsarMetadata pulsarMetadata;
 
+    protected PulsarAuth pulsarAuth;
+
     protected PulsarAdmin pulsarAdmin;
 
     protected Schemas schemas;
@@ -367,7 +369,9 @@ public abstract class TestPulsarConnector {
         pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
         PulsarDispatchingRowDecoderFactory dispatchingRowDecoderFactory =
                 new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager());
-        PulsarMetadata pulsarMetadata = new PulsarMetadata(pulsarConnectorId, pulsarConnectorConfig, dispatchingRowDecoderFactory);
+        PulsarAuth pulsarAuth = new PulsarAuth(pulsarConnectorConfig);
+        PulsarMetadata pulsarMetadata =
+                new PulsarMetadata(pulsarConnectorId, pulsarConnectorConfig, dispatchingRowDecoderFactory, pulsarAuth);
         return pulsarMetadata;
     }
 
@@ -539,7 +543,11 @@ public abstract class TestPulsarConnector {
         doReturn(schemas).when(pulsarAdmin).schemas();
         doReturn(pulsarAdmin).when(this.pulsarConnectorConfig).getPulsarAdmin();
 
-        this.pulsarMetadata = new PulsarMetadata(pulsarConnectorId, this.pulsarConnectorConfig, dispatchingRowDecoderFactory);
+        this.pulsarAuth = mock(PulsarAuth.class);
+
+        this.pulsarMetadata =
+                new PulsarMetadata(pulsarConnectorId, this.pulsarConnectorConfig, dispatchingRowDecoderFactory,
+                        this.pulsarAuth);
         this.pulsarSplitManager = Mockito.spy(new PulsarSplitManager(pulsarConnectorId, this.pulsarConnectorConfig));
 
         ManagedLedgerFactory managedLedgerFactory = mock(ManagedLedgerFactory.class);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index f3aaf75ed9d..5335d8ad8f7 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -26,6 +26,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.ClientErrorException;
@@ -35,6 +37,7 @@ import java.util.stream.Collectors;
 
 import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
 import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
 import static org.mockito.Mockito.*;
 import static org.testng.Assert.*;
 
@@ -377,4 +380,59 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         assertTrue(fieldNames.isEmpty());
     }
+
+    @Test
+    public void testPulsarAuthCheck() {
+        this.pulsarConnectorConfig.setAuthorizationEnabled(true);
+        doNothing().when(this.pulsarAuth).checkTopicAuth(isA(ConnectorSession.class), isA(String.class));
+
+        // Test getTableHandle should pass the auth check
+        SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName());
+        this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
+
+        // Test getTableMetadata should pass the auth check
+        TopicName topic = TOPIC_1;
+        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
+                topic.toString(),
+                topic.getNamespace(),
+                topic.getLocalName(),
+                topic.getLocalName()
+        );
+        this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
+                pulsarTableHandle);
+
+        // Test getColumnHandles should pass the auth check
+        this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle);
+
+        doThrow(new PrestoException(PERMISSION_DENIED, "not authorized")).when(this.pulsarAuth)
+                .checkTopicAuth(isA(ConnectorSession.class), isA(String.class));
+
+        // Test getTableHandle should fail the auth check
+        try {
+            this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
+            Assert.fail("Test getTableHandle should fail the auth check"); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+        }
+
+        // Test getTableMetadata should fail the auth check
+        try {
+            this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
+                    pulsarTableHandle);
+            Assert.fail("Test getTableMetadata should fail the auth check"); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+        }
+
+        // Test getColumnHandles should fail the auth check
+        try {
+            this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle);
+            Assert.fail("Test getColumnHandles should fail the auth check"); // should fail
+        } catch (PrestoException e) {
+            Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
+        }
+
+        this.pulsarMetadata.cleanupQuery(mock(ConnectorSession.class));
+        Mockito.verify(this.pulsarAuth, Mockito.times(1)).cleanSession(isA(ConnectorSession.class));
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
index e5ceb321aae..005ff8cd703 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/AbstractDecoderTester.java
@@ -30,6 +30,7 @@ import java.math.BigDecimal;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarAuth;
 import org.apache.pulsar.sql.presto.PulsarColumnHandle;
 import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
 import org.apache.pulsar.sql.presto.PulsarConnectorConfig;
@@ -67,7 +68,8 @@ public abstract class AbstractDecoderTester {
         this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1);
         this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
         this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
-        this.pulsarMetadata = new PulsarMetadata(pulsarConnectorId, this.pulsarConnectorConfig, decoderFactory);
+        this.pulsarMetadata = new PulsarMetadata(pulsarConnectorId, this.pulsarConnectorConfig, decoderFactory,
+                new PulsarAuth(this.pulsarConnectorConfig));
         this.topicName = TopicName.get("persistent", NamespaceName.get("tenant-1", "ns-1"), "topic-1");
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
new file mode 100644
index 00000000000..281ae376df6
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLAuth.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.crypto.SecretKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.PrestoWorkerContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class TestPulsarSQLAuth extends TestPulsarSQLBase {
+    private SecretKey secretKey;
+    private String adminToken;
+    private PulsarAdmin admin;
+
+    @Override
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName,
+                                                                            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+        specBuilder = super.beforeSetupCluster(clusterName, specBuilder);
+        specBuilder.enablePrestoWorker(true);
+        return specBuilder;
+    }
+
+    @Override
+    protected void beforeStartCluster() {
+        secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+        adminToken = AuthTokenUtils.createToken(secretKey, "admin", Optional.empty());
+
+        Map<String, String> envMap = new HashMap<>();
+        envMap.put("authenticationEnabled", "true");
+        envMap.put("authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+        envMap.put("authorizationEnabled", "true");
+        envMap.put("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+        envMap.put("superUserRoles", "admin");
+        envMap.put("brokerDeleteInactiveTopicsEnabled", "false");
+
+        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+            brokerContainer.withEnv(envMap);
+        }
+
+        PrestoWorkerContainer prestoWorkerContainer = pulsarCluster.getPrestoWorkerContainer();
+
+        prestoWorkerContainer
+                .withEnv("SQL_PREFIX_pulsar.auth-plugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken")
+                .withEnv("SQL_PREFIX_pulsar.auth-params", adminToken)
+                .withEnv("pulsar.broker-binary-service-url", "pulsar://pulsar-broker-0:6650")
+                .withEnv("pulsar.authorization-enabled", "true");
+
+    }
+
+    @Override
+    public void setupCluster() throws Exception {
+        super.setupCluster();
+        initJdbcConnection();
+        admin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                .authentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", adminToken)
+                .build();
+    }
+
+    @Override
+    public void tearDownCluster() throws Exception {
+        super.tearDownCluster();
+    }
+
+    @Test
+    public void testPulsarSQLAuthCheck() throws PulsarAdminException {
+        String passRole = RandomStringUtils.randomAlphabetic(4) + "-pass";
+        String deniedRole = RandomStringUtils.randomAlphabetic(4) + "-denied";
+        String passToken = AuthTokenUtils.createToken(secretKey, passRole, Optional.empty());
+        String deniedToken = AuthTokenUtils.createToken(secretKey, deniedRole, Optional.empty());
+        String topic = "testPulsarSQLAuthCheck";
+
+        admin.topics().grantPermission(topic, passRole, EnumSet.of(AuthAction.consume));
+
+        admin.topics().createNonPartitionedTopic(topic);
+
+        String queryAllDataSql = String.format("select * from pulsar.\"%s\".\"%s\";", "public/default", topic);
+
+        assertSQLExecution(
+                () -> {
+                    try {
+                        ContainerExecResult containerExecResult =
+                                execQuery(queryAllDataSql, new HashMap<>() {{
+                                    put("auth-plugin",
+                                            "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                                    put("auth-params", passToken);
+                                }});
+                        Assert.assertEquals(0, containerExecResult.getExitCode());
+                    } catch (ContainerExecException e) {
+                        Assert.fail(String.format("assertSQLExecution fail: %s", e.getLocalizedMessage()));
+                    }
+                }
+        );
+
+        assertSQLExecution(
+                () -> {
+                    try {
+                        execQuery(queryAllDataSql, new HashMap<>() {{
+                            put("auth-plugin",
+                                    "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                            put("auth-params", "invalid-token");
+                        }});
+                        Assert.fail("Should not pass");
+                    } catch (ContainerExecException e) {
+                        // Authorization error
+                        Assert.assertEquals(1, e.getResult().getExitCode());
+                        log.info(e.getResult().getStderr());
+                        Assert.assertTrue(e.getResult().getStderr().contains("Unable to authenticate"));
+                    }
+                }
+        );
+
+        assertSQLExecution(
+                () -> {
+                    try {
+                        execQuery(queryAllDataSql, new HashMap<>() {{
+                            put("auth-plugin",
+                                    "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                            put("auth-params", deniedToken);
+                        }});
+                        Assert.fail("Should not pass");
+                    } catch (ContainerExecException e) {
+                        // Authorization error
+                        Assert.assertEquals(1, e.getResult().getExitCode());
+                        log.info(e.getResult().getStderr());
+                        Assert.assertTrue(e.getResult().getStderr().contains("not authorized"));
+                    }
+                }
+        );
+    }
+
+    @Test
+    public void testCheckAuthForMultipleTopics() throws PulsarAdminException {
+        String testRole = RandomStringUtils.randomAlphabetic(4) + "-test";
+        String testToken = AuthTokenUtils.createToken(secretKey, testRole, Optional.empty());
+        String topic1 = "testCheckAuthForMultipleTopics1";
+        String topic2 = "testCheckAuthForMultipleTopics2";
+
+        admin.topics().grantPermission(topic1, testRole, EnumSet.of(AuthAction.consume));
+
+        admin.topics().createNonPartitionedTopic(topic1);
+
+        admin.topics().createPartitionedTopic(topic2, 2); // Test for partitioned topic
+
+        String queryAllDataSql =
+                String.format("select * from pulsar.\"public/default\".\"%s\", pulsar.\"public/default\".\"%s\";",
+                        topic1, topic2);
+
+        assertSQLExecution(
+                () -> {
+                    try {
+                        execQuery(queryAllDataSql, new HashMap<>() {{
+                            put("auth-plugin",
+                                    "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                            put("auth-params", testToken);
+                        }});
+                        Assert.fail("Should not pass");
+                    } catch (ContainerExecException e) {
+                        // Authorization error
+                        Assert.assertEquals(1, e.getResult().getExitCode());
+                        log.info(e.getResult().getStderr());
+                    }
+                }
+        );
+
+        admin.topics().grantPermission(topic2, testRole, EnumSet.of(AuthAction.consume));
+
+        assertSQLExecution(
+                () -> {
+                    try {
+                        ContainerExecResult containerExecResult =
+                                execQuery(queryAllDataSql, new HashMap<>() {{
+                                    put("auth-plugin",
+                                            "org.apache.pulsar.client.impl.auth.AuthenticationToken");
+                                    put("auth-params", testToken);
+                                }});
+
+                        Assert.assertEquals(0, containerExecResult.getExitCode());
+                    } catch (ContainerExecException e) {
+                        Assert.fail(String.format("assertSQLExecution fail: %s", e.getLocalizedMessage()));
+                    }
+                }
+        );
+    }
+
+    private void assertSQLExecution(org.awaitility.core.ThrowingRunnable assertion) {
+        Awaitility.await()
+                .pollDelay(Duration.ofMillis(0))
+                .pollInterval(Duration.ofSeconds(3))
+                .atMost(Duration.ofSeconds(15))
+                .untilAsserted(assertion);
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 0626e3522e8..74d373ccc45 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -28,6 +28,7 @@ import java.sql.Timestamp;
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CompressionType;
@@ -267,16 +268,33 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
     }
 
     public ContainerExecResult execQuery(final String query) throws Exception {
+        return execQuery(query, null);
+    }
+
+    public ContainerExecResult execQuery(final String query, Map<String, String> extraCredentials) throws Exception {
         ContainerExecResult containerExecResult;
 
+        StringBuilder extraCredentialsString = new StringBuilder(" ");
+
+        if (extraCredentials != null) {
+            for (Map.Entry<String, String> entry : extraCredentials.entrySet()) {
+                extraCredentialsString.append(
+                        String.format("--extra-credential %s=%s ", entry.getKey(), entry.getValue()));
+            }
+        }
+
         containerExecResult = pulsarCluster.getPrestoWorkerContainer()
-                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+                .execCmd("/bin/bash", "-c",
+                        PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql" + extraCredentialsString + "--execute " + "'"
+                                + query + "'");
 
         Stopwatch sw = Stopwatch.createStarted();
         while (containerExecResult.getExitCode() != 0 && sw.elapsed(TimeUnit.SECONDS) < 120) {
             TimeUnit.MILLISECONDS.sleep(500);
             containerExecResult = pulsarCluster.getPrestoWorkerContainer()
-                    .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+                    .execCmd("/bin/bash", "-c",
+                            PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql" + extraCredentialsString + "--execute " + "'"
+                                    + query + "'");
         }
 
         return containerExecResult;
diff --git a/tests/integration/src/test/resources/pulsar-sql.xml b/tests/integration/src/test/resources/pulsar-sql.xml
index bedc443f6c9..1ab4d479ad0 100644
--- a/tests/integration/src/test/resources/pulsar-sql.xml
+++ b/tests/integration/src/test/resources/pulsar-sql.xml
@@ -24,6 +24,7 @@
         <classes>
             <class name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
             <class name="org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage" />
+            <class name="org.apache.pulsar.tests.integration.presto.TestPulsarSQLAuth" />
         </classes>
     </test>
 </suite>