You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/06 00:42:21 UTC

[pulsar] branch branch-2.10 updated (a585dd9bb33 -> 028b014cbf1)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from a585dd9bb33 [improve][test] Add test case for system topic schema not compatible bug. (#17992)
     new b3f31acf2a3 Pulsar Admin: grab contextual stacktrace for sync methods (#14620)
     new 028b014cbf1 [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 +++
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  2 +-
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  2 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  2 +-
 .../apache/pulsar/broker/admin/v2/Functions.java   |  2 +-
 .../org/apache/pulsar/broker/admin/v2/Worker.java  |  4 +-
 .../apache/pulsar/broker/admin/v2/WorkerStats.java |  2 +-
 .../apache/pulsar/broker/PulsarServiceTest.java    | 58 ++++++++++++++--
 .../pulsar/client/admin/PulsarAdminException.java  | 77 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/BaseResource.java |  9 ++-
 .../client/admin/internal/TopicPoliciesImpl.java   | 15 +----
 11 files changed, 154 insertions(+), 28 deletions(-)


[pulsar] 01/02: Pulsar Admin: grab contextual stacktrace for sync methods (#14620)

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3f31acf2a3c4c0ccebdacb8db083753bb70bdb2
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Mar 29 11:39:17 2022 +0200

    Pulsar Admin: grab contextual stacktrace for sync methods (#14620)
    
    (cherry picked from commit 1730415bece2d779cf2d978f043209b5888ed2f8)
---
 .../pulsar/client/admin/PulsarAdminException.java  | 77 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/BaseResource.java |  9 ++-
 .../client/admin/internal/TopicPoliciesImpl.java   | 15 +----
 3 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 041f8a659f4..6e0285d7d1e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -69,6 +69,15 @@ public class PulsarAdminException extends Exception {
         return statusCode;
     }
 
+    /**
+     * This method is meant to be overriden by all subclasses.
+     * We cannot make it 'abstract' because it would be a breaking change in the public API.
+     * @return a new PulsarAdminException
+     */
+    protected PulsarAdminException clone() {
+        return new PulsarAdminException(getMessage(), getCause(), httpError, statusCode);
+    }
+
     /**
      * Not Authorized Exception.
      */
@@ -76,6 +85,11 @@ public class PulsarAdminException extends Exception {
         public NotAuthorizedException(Throwable t, String httpError, int statusCode) {
             super(httpError, t, httpError, statusCode);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new NotAuthorizedException(getCause(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -85,6 +99,11 @@ public class PulsarAdminException extends Exception {
         public NotFoundException(Throwable t, String httpError, int statusCode) {
             super(httpError, t, httpError, statusCode);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new NotFoundException(getCause(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -94,6 +113,11 @@ public class PulsarAdminException extends Exception {
         public NotAllowedException(Throwable t, String httpError, int statusCode) {
             super(httpError, t, httpError, statusCode);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new NotAllowedException(getCause(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -103,6 +127,11 @@ public class PulsarAdminException extends Exception {
         public ConflictException(Throwable t, String httpError, int statusCode) {
             super(httpError, t, httpError, statusCode);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new ConflictException(getCause(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -112,6 +141,11 @@ public class PulsarAdminException extends Exception {
         public PreconditionFailedException(Throwable t, String httpError, int statusCode) {
             super(httpError, t, httpError, statusCode);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new PreconditionFailedException(getCause(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -121,6 +155,11 @@ public class PulsarAdminException extends Exception {
         public TimeoutException(Throwable t) {
             super(t);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new TimeoutException(getCause());
+        }
     }
 
     /**
@@ -131,9 +170,15 @@ public class PulsarAdminException extends Exception {
             super(message, t, httpError, statusCode);
         }
 
+        @Deprecated
         public ServerSideErrorException(Throwable t) {
             super("Some error occourred on the server", t);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new ServerSideErrorException(getCause(), getMessage(), getHttpError(), getStatusCode());
+        }
     }
 
     /**
@@ -147,6 +192,11 @@ public class PulsarAdminException extends Exception {
         public HttpErrorException(Throwable t) {
             super(t);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new HttpErrorException(getCause());
+        }
     }
 
     /**
@@ -160,6 +210,11 @@ public class PulsarAdminException extends Exception {
         public ConnectException(String message, Throwable t) {
             super(message, t);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new ConnectException(getMessage(), getCause());
+        }
     }
 
     /**
@@ -170,8 +225,30 @@ public class PulsarAdminException extends Exception {
             super(t);
         }
 
+        @Deprecated
         public GettingAuthenticationDataException(String msg) {
             super(msg);
         }
+
+        @Override
+        protected PulsarAdminException clone() {
+            return new GettingAuthenticationDataException(getCause());
+        }
+    }
+
+    /**
+     * Clone the exception and grab the current stacktrace.
+     * @param e a PulsarAdminException
+     * @return a new PulsarAdminException, of the same class.
+     */
+    public static PulsarAdminException wrap(PulsarAdminException e) {
+        PulsarAdminException cloned =  e.clone();
+        if (e.getClass() != cloned.getClass()) {
+            throw new IllegalStateException("Cloning a " + e.getClass() + " generated a "
+                    + cloned.getClass() + ", this is a bug, original error is " + e, e);
+        }
+        // adding a reference to the original exception.
+        cloned.addSuppressed(e);
+        return (PulsarAdminException) cloned.fillInStackTrace();
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 6df43c29b8f..4625c01fcef 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -290,15 +290,18 @@ public abstract class BaseResource {
     protected <T> T sync(Supplier<CompletableFuture<T>> executor) throws PulsarAdminException {
         try {
             return executor.get().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-           throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
           throw new PulsarAdminException(e);
         } catch (TimeoutException e) {
           throw new PulsarAdminException.TimeoutException(e);
+        } catch (ExecutionException e) {
+            // we want to have a stacktrace that points to this point, in order to return a meaninful
+            // stacktrace to the user, otherwise we will have a stacktrace
+            // related to another thread, because all Admin API calls are async
+            throw PulsarAdminException.wrap(getApiException(e.getCause()));
         } catch (Exception e) {
-            throw getApiException(e);
+            throw PulsarAdminException.wrap(getApiException(e));
         }
     }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 2caf1876f51..bd300377807 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -21,9 +21,6 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -1237,17 +1234,7 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies {
 
     @Override
     public void removeSubscriptionTypesEnabled(String topic) throws PulsarAdminException {
-        try {
-            removeSubscriptionTypesEnabledAsync(topic)
-                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            throw (PulsarAdminException) e.getCause();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarAdminException(e);
-        } catch (TimeoutException e) {
-            throw new PulsarAdminException.TimeoutException(e);
-        }
+        sync(() -> removeSubscriptionTypesEnabledAsync(topic));
     }
 
     @Override


[pulsar] 02/02: [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901)

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 028b014cbf18034150269a6b6288ec519faf69bd
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Nov 2 17:40:19 2022 -0700

    [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901)
    
    (cherry picked from commit 01e0068f0e01216b5b456ecbaf102cdc4dcef56a)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 ++++
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  2 +-
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  2 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  2 +-
 .../apache/pulsar/broker/admin/v2/Functions.java   |  2 +-
 .../org/apache/pulsar/broker/admin/v2/Worker.java  |  4 +-
 .../apache/pulsar/broker/admin/v2/WorkerStats.java |  2 +-
 .../apache/pulsar/broker/PulsarServiceTest.java    | 58 ++++++++++++++++++++--
 8 files changed, 70 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index eb5b62d0de5..b5d11c757ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -285,6 +286,14 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected WorkerService validateAndGetWorkerService() {
+        try {
+            return pulsar().getWorkerService();
+        } catch (UnsupportedOperationException e) {
+            throw new RestException(Status.CONFLICT, e.getMessage());
+        }
+    }
+
     protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             Policies policies = namespaceResources().getPolicies(namespaceName)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f1c4c105de6..3e26215c32e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -55,7 +55,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class FunctionsBase extends AdminResource {
 
     Functions<? extends WorkerService> functions() {
-        return pulsar().getWorkerService().getFunctions();
+        return validateAndGetWorkerService().getFunctions();
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index d45016454f5..3f97545010c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class SinksBase extends AdminResource {
 
     Sinks<? extends WorkerService> sinks() {
-        return pulsar().getWorkerService().getSinks();
+        return validateAndGetWorkerService().getSinks();
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index b4ba332c312..5d020290a31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class SourcesBase extends AdminResource {
 
     Sources<? extends WorkerService> sources() {
-        return pulsar().getWorkerService().getSources();
+        return validateAndGetWorkerService().getSources();
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index a83ea8cd514..1164e371bd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -53,7 +53,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class Functions extends AdminResource {
 
     FunctionsV2<? extends WorkerService> functions() {
-        return pulsar().getWorkerService().getFunctionsV2();
+        return validateAndGetWorkerService().getFunctionsV2();
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 12b7950a582..616f46621c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -45,12 +45,12 @@ import org.apache.pulsar.functions.worker.service.api.Workers;
 public class Worker extends AdminResource implements Supplier<WorkerService> {
 
     Workers<? extends WorkerService> workers() {
-        return pulsar().getWorkerService().getWorkers();
+        return validateAndGetWorkerService().getWorkers();
     }
 
     @Override
     public WorkerService get() {
-        return pulsar().getWorkerService();
+        return validateAndGetWorkerService();
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 6703caa7a27..6b9837c52be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.functions.worker.service.api.Workers;
 public class WorkerStats extends AdminResource {
 
     public Workers<? extends WorkerService> workers() {
-        return pulsar().getWorkerService().getWorkers();
+        return validateAndGetWorkerService().getWorkers();
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index d7bd832adee..ddb1ed4d469 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.testng.annotations.AfterMethod;
@@ -82,21 +83,70 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
      */
     @Test
     public void testGetWorkerServiceException() throws Exception {
+        init();
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setZookeeperServers("localhost");
         configuration.setClusterName("clusterName");
         configuration.setFunctionsWorkerEnabled(false);
         configuration.setBrokerShutdownTimeoutMs(0L);
-        @Cleanup
-        PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(),
-                Optional.empty(), (exitCode) -> {});
+        configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        configuration.setBrokerServicePort(Optional.of(0));
+        configuration.setBrokerServicePortTls(Optional.of(0));
+        configuration.setWebServicePort(Optional.of(0));
+        configuration.setWebServicePortTls(Optional.of(0));
+        startBroker(configuration);
 
         String errorMessage = "Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false";
+
+        int thrownCnt = 0;
         try {
-            pulsarService.getWorkerService();
+            pulsar.getWorkerService();
         } catch (UnsupportedOperationException e) {
+            thrownCnt++;
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.sources().listSources("my", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
             assertEquals(e.getMessage(), errorMessage);
         }
+
+        try {
+            admin.sinks().getSinkStatus("my", "test", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.functions().getFunction("my", "test", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.worker().getClusterLeader();
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.worker().getFunctionsStats();
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        assertEquals(thrownCnt, 6);
     }
 
     @Test