You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/05 14:35:58 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #15454: Fix authorization provider issue.

Technoboy- opened a new pull request, #15454:
URL: https://github.com/apache/pulsar/pull/15454

   ### Motivation
   When `roleClaim` is a String, it will return an empty list.
   https://github.com/apache/pulsar/blob/afc48e64cb161eb3fd1e00f5f997ea8f00fe66ab/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java#L113-L126
   We should return the singletonList.
   
   Method `authorize` using the common pool thread and has a sync call :
   https://github.com/apache/pulsar/blob/afc48e64cb161eb3fd1e00f5f997ea8f00fe66ab/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java#L137-L161
   And if line-141 has an exception, the future could not complete.
   
   
   ### Documentation
   
   - [x] `no-need-doc` 
   (Please explain why)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15454: Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867353431


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)
+                    .filter(tester)
+                    .findFirst();
+            if (!value.isPresent()) {
+                if (futures.size() == 0) {
+                    return CompletableFuture.completedFuture(Optional.empty());
+                }
+                return waitForAny(futures, tester);
+            }
+            futures.forEach(f -> {
+                if (!f.isDone()) {
+                    f.cancel(true);
+                }
+            });

Review Comment:
   We should also cancel the remaining futures before line 64?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15454: Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867353802


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)

Review Comment:
   Can we avoid the `join`? async & sync operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie commented on a diff in pull request #15454: Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867361866


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,

Review Comment:
   Let's write some comments to describe this method clearly.



##########
pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java:
##########
@@ -136,4 +139,37 @@ public void testGetOriginalException() {
                     assertTrue(future3Exception.get(0) instanceof IllegalStateException);
                 });
     }
+
+    @Test
+    public void testWaitForAny() {

Review Comment:
   We should also cover the case that other futures could be canceled successfully before the waitForAny finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15454: [improve][broker] Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867426207


##########
pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java:
##########
@@ -136,4 +139,37 @@ public void testGetOriginalException() {
                     assertTrue(future3Exception.get(0) instanceof IllegalStateException);
                 });
     }
+
+    @Test
+    public void testWaitForAny() {

Review Comment:
   right, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15454: [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15454:
URL: https://github.com/apache/pulsar/pull/15454


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15454: [improve][broker] Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867425847


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15454: [improve][broker] Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867425836


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)
+                    .filter(tester)
+                    .findFirst();
+            if (!value.isPresent()) {
+                if (futures.size() == 0) {
+                    return CompletableFuture.completedFuture(Optional.empty());
+                }
+                return waitForAny(futures, tester);
+            }
+            futures.forEach(f -> {
+                if (!f.isDone()) {
+                    f.cancel(true);
+                }
+            });

Review Comment:
   yes, right. fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #15454: Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867353859


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)

Review Comment:
   This is for `doneFutures`, all the futures are completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15454: Fix authorization provider issue.

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15454:
URL: https://github.com/apache/pulsar/pull/15454#discussion_r867354811


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java:
##########
@@ -55,6 +57,36 @@ public static CompletableFuture<Object> waitForAny(Collection<? extends Completa
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                       Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)

Review Comment:
   Thanks for your reply.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org