You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/13 02:47:30 UTC

[GitHub] [pulsar] RobertIndie opened a new pull request, #15571: [feature][sql] Add support for pulsar authorization

RobertIndie opened a new pull request, #15571:
URL: https://github.com/apache/pulsar/pull/15571

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   
   ### Motivation
   
   Currently, pulsar SQL does not compatible with the authentication and authorization from the pulsar broker. We don't have any security-related integration between Pulsar and Presto.
   
   Although we can implement a presto access control plugin to interface with the pulsar broker's authorization module. But this method does not handle authentication well.
   
   ### Modifications
   
   This PR adds authentication and authorization support between Pulsar and Pulsar SQL by using the extra credentials properties. The request from the pulsar SQL client can have auth-related parameters(like `auth-plugin` and `auth-params`) attached to the extra credentials. The pulsar SQL worker can therefore obtain auth information and use PulsarClient to initiate an auth verification request to the broker. In this way, we can call the broker's authentication and authentication directly on the pulsar SQL worker.
   
   This PR adds a new module `PulsarAuth` to the SQL worker. 
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877007838


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java:
##########
@@ -273,14 +278,24 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
         return columns.build();
     }
 
-    private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
+    @Override
+    public void cleanupQuery(ConnectorSession session) {
+        if (pulsarConnectorConfig.getAuthorizationEnable()) {
+            pulsarAuth.cleanSession(session);
+        }
+    }
+
+    private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName,
+                                                    boolean withInternalColumns) {
 
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
         }
 
         TopicName topicName = getMatchedTopicName(schemaTableName);
 
+        checkTopicAuthorization(session, topicName.toString());

Review Comment:
   Maybe we add auth check here is enough, other methods are all based on this method. The table name in the query will convert to a topic name, and we need to use the topic name to check the authorization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877841586


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java:
##########
@@ -267,16 +268,33 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
     }
 
     public ContainerExecResult execQuery(final String query) throws Exception {
+        return execQuery(query, null);
+    }
+
+    public ContainerExecResult execQuery(final String query, Map<String, String> extraCredentials) throws Exception {
         ContainerExecResult containerExecResult;
 
+        StringBuilder extraCredentialsString = new StringBuilder(" ");

Review Comment:
   It seems it's difficult to determine the capacity of the string builder in advance. It seems not a big deal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877088728


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java:
##########
@@ -63,6 +64,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private boolean namespaceDelimiterRewriteEnable = false;
     private String rewriteNamespaceDelimiter = "/";
 
+    private boolean authorizationEnable = false;

Review Comment:
   authorizationEnable -> authorizationEnabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877814912


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java:
##########
@@ -273,14 +278,24 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
         return columns.build();
     }
 
-    private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
+    @Override
+    public void cleanupQuery(ConnectorSession session) {
+        if (pulsarConnectorConfig.getAuthorizationEnable()) {
+            pulsarAuth.cleanSession(session);
+        }
+    }
+
+    private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName,
+                                                    boolean withInternalColumns) {
 
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
         }
 
         TopicName topicName = getMatchedTopicName(schemaTableName);
 
+        checkTopicAuthorization(session, topicName.toString());

Review Comment:
   You're right. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877085101


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java:
##########
@@ -267,16 +268,33 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
     }
 
     public ContainerExecResult execQuery(final String query) throws Exception {
+        return execQuery(query, null);
+    }
+
+    public ContainerExecResult execQuery(final String query, Map<String, String> extraCredentials) throws Exception {
         ContainerExecResult containerExecResult;
 
+        StringBuilder extraCredentialsString = new StringBuilder(" ");

Review Comment:
   Init with capacity seems better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r882300287


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java:
##########
@@ -273,14 +278,24 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
         return columns.build();
     }
 
-    private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
+    @Override
+    public void cleanupQuery(ConnectorSession session) {
+        if (pulsarConnectorConfig.getAuthorizationEnable()) {
+            pulsarAuth.cleanSession(session);
+        }
+    }
+
+    private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName,
+                                                    boolean withInternalColumns) {
 
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
         }
 
         TopicName topicName = getMatchedTopicName(schemaTableName);
 
+        checkTopicAuthorization(session, topicName.toString());

Review Comment:
   There is an interface method `getTableHandle` that doesn't go through the `getTableMetadata` method. So I added an auth check to the `getTableHandle`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r877078867


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * This class implements the authentication and authorization integration between the Pulsar SQL worker and the
+ * Pulsar broker.
+ *
+ * It will check permissions against the session-topic pair by trying to subscribe to a topic using the Pulsar Reader
+ * to check the consumption privilege. The same topic will only be checked once during the same session.
+ */
+public class PulsarAuth {
+
+    private static final Logger log = Logger.get(PulsarAuth.class);
+
+    private final PulsarConnectorConfig pulsarConnectorConfig;
+    private static final String CREDENTIALS_AUTH_PLUGIN = "auth-plugin";
+    private static final String CREDENTIALS_AUTH_PARAMS = "auth-params";
+    @VisibleForTesting
+    final Map<String, Set<String>> authorizedQueryTopicsMap = new ConcurrentHashMap<>();
+
+    @Inject
+    public PulsarAuth(PulsarConnectorConfig pulsarConnectorConfig) {
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
+        if (pulsarConnectorConfig.getAuthorizationEnable() && StringUtils.isEmpty(
+                pulsarConnectorConfig.getBrokerBinaryServiceUrl())) {
+            throw new IllegalArgumentException(
+                    "pulsar.broker-binary-service-url must be presented when the pulsar.authorization-enable is true.");

Review Comment:
   presented -> present?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r880348583


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * This class implements the authentication and authorization integration between the Pulsar SQL worker and the
+ * Pulsar broker.
+ *
+ * It will check permissions against the session-topic pair by trying to subscribe to a topic using the Pulsar Reader
+ * to check the consumption privilege. The same topic will only be checked once during the same session.
+ */
+public class PulsarAuth {
+
+    private static final Logger log = Logger.get(PulsarAuth.class);
+
+    private final PulsarConnectorConfig pulsarConnectorConfig;
+    private static final String CREDENTIALS_AUTH_PLUGIN = "auth-plugin";
+    private static final String CREDENTIALS_AUTH_PARAMS = "auth-params";
+    @VisibleForTesting
+    final Map<String, Set<String>> authorizedQueryTopicsMap = new ConcurrentHashMap<>();
+
+    @Inject
+    public PulsarAuth(PulsarConnectorConfig pulsarConnectorConfig) {
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
+        if (pulsarConnectorConfig.getAuthorizationEnabled() && StringUtils.isEmpty(
+                pulsarConnectorConfig.getBrokerBinaryServiceUrl())) {
+            throw new IllegalArgumentException(
+                    "pulsar.broker-binary-service-url must be present when the pulsar.authorization-enable is true.");
+        }
+    }
+
+    /**
+     * Check if the session has read access to the topic.
+     * It will try to subscribe to that topic using the Pulsar Reader to check the consumption privilege.
+     * The same topic will only be checked once during the same session.
+     */
+    public void checkTopicAuth(ConnectorSession session, String topic) {
+        Set<String> authorizedTopics =
+                authorizedQueryTopicsMap.computeIfAbsent(session.getQueryId(), query -> new HashSet<>());
+        if (authorizedTopics.contains(topic)) {
+            if (log.isDebugEnabled()) {
+                log.debug("The topic %s is already authorized.", topic);
+            }
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Checking the authorization for the topic: %s", topic);
+        }
+        Map<String, String> extraCredentials = session.getIdentity().getExtraCredentials();
+        if (extraCredentials.isEmpty()) { // the extraCredentials won't be null
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: The credential information is empty.",
+                            topic));
+        }
+        String authMethod = extraCredentials.get(CREDENTIALS_AUTH_PLUGIN);
+        String authParams = extraCredentials.get(CREDENTIALS_AUTH_PARAMS);
+        if (StringUtils.isEmpty(authMethod) || StringUtils.isEmpty(authParams)) {
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: Required credential parameters are "
+                                    + "missing. Please specify the auth-method and auth-params in the extra "
+                                    + "credentials.",
+                            topic));
+        }
+        try {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarConnectorConfig.getBrokerBinaryServiceUrl())
+                    .authentication(authMethod, authParams)
+                    .build();
+            client.newReader().topic(topic)
+                    // For the case of the partitioned topic, the receiverQueueSize must be greater than 0.
+                    .receiverQueueSize(1)

Review Comment:
   Depends on this PR https://github.com/apache/pulsar/pull/15666 to create the reader with a pause state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#issuecomment-1136987079

   > I thought that Pulsar SQL was moved to Trino.
   > 
   > What about contributing this feature to Trino instead of here ?
   
   AFAIK, pulsar SQL has not been contributed to the trino currently, it is still a work in progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #15571:
URL: https://github.com/apache/pulsar/pull/15571


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#issuecomment-1135843259

   I thought that Pulsar SQL was moved to Trino.
   
   What about contributing this feature to Trino instead of here ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#issuecomment-1138393730

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#discussion_r881408982


##########
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarAuth.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static io.prestosql.spi.StandardErrorCode.PERMISSION_DENIED;
+import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import io.airlift.log.Logger;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * This class implements the authentication and authorization integration between the Pulsar SQL worker and the
+ * Pulsar broker.
+ *
+ * It will check permissions against the session-topic pair by trying to subscribe to a topic using the Pulsar Reader
+ * to check the consumption privilege. The same topic will only be checked once during the same session.
+ */
+public class PulsarAuth {
+
+    private static final Logger log = Logger.get(PulsarAuth.class);
+
+    private final PulsarConnectorConfig pulsarConnectorConfig;
+    private static final String CREDENTIALS_AUTH_PLUGIN = "auth-plugin";
+    private static final String CREDENTIALS_AUTH_PARAMS = "auth-params";
+    @VisibleForTesting
+    final Map<String, Set<String>> authorizedQueryTopicsMap = new ConcurrentHashMap<>();
+
+    @Inject
+    public PulsarAuth(PulsarConnectorConfig pulsarConnectorConfig) {
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
+        if (pulsarConnectorConfig.getAuthorizationEnabled() && StringUtils.isEmpty(
+                pulsarConnectorConfig.getBrokerBinaryServiceUrl())) {
+            throw new IllegalArgumentException(
+                    "pulsar.broker-binary-service-url must be present when the pulsar.authorization-enable is true.");
+        }
+    }
+
+    /**
+     * Check if the session has read access to the topic.
+     * It will try to subscribe to that topic using the Pulsar Reader to check the consumption privilege.
+     * The same topic will only be checked once during the same session.
+     */
+    public void checkTopicAuth(ConnectorSession session, String topic) {
+        Set<String> authorizedTopics =
+                authorizedQueryTopicsMap.computeIfAbsent(session.getQueryId(), query -> new HashSet<>());
+        if (authorizedTopics.contains(topic)) {
+            if (log.isDebugEnabled()) {
+                log.debug("The topic %s is already authorized.", topic);
+            }
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Checking the authorization for the topic: %s", topic);
+        }
+        Map<String, String> extraCredentials = session.getIdentity().getExtraCredentials();
+        if (extraCredentials.isEmpty()) { // the extraCredentials won't be null
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: The credential information is empty.",
+                            topic));
+        }
+        String authMethod = extraCredentials.get(CREDENTIALS_AUTH_PLUGIN);
+        String authParams = extraCredentials.get(CREDENTIALS_AUTH_PARAMS);
+        if (StringUtils.isEmpty(authMethod) || StringUtils.isEmpty(authParams)) {
+            throw new PrestoException(QUERY_REJECTED,
+                    String.format(
+                            "Failed to check the authorization for topic %s: Required credential parameters are "
+                                    + "missing. Please specify the auth-method and auth-params in the extra "
+                                    + "credentials.",
+                            topic));
+        }
+        try {
+            PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarConnectorConfig.getBrokerBinaryServiceUrl())
+                    .authentication(authMethod, authParams)
+                    .build();
+            client.newReader().topic(topic)
+                    // For the case of the partitioned topic, the receiverQueueSize must be greater than 0.
+                    .receiverQueueSize(1)

Review Comment:
   I have used the consumer instead of the reader here. PTAL again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on pull request #15571: [feature][sql] Add Pulsar Auth support for the Pulsar SQL

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on PR #15571:
URL: https://github.com/apache/pulsar/pull/15571#issuecomment-1139201395

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org