You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 13:11:39 UTC

[pulsar] 01/05: [broker] change getWorkerService method to throw UnsupportedOperationException (#9738)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 67a4be681aee9a74da38c388da3721e1140cf9c8
Author: guodongyang <11...@qq.com>
AuthorDate: Wed Mar 3 12:13:18 2021 +0800

    [broker] change getWorkerService method to throw UnsupportedOperationException (#9738)
    
    Fix #9633
    
    When we set `functionsWorkerEnabled=false` in `broker.conf`, broker don't support management function of `functionWorker`.
    But when we try to appy those method. Broker will throw NullPointerException  instead of throwing a more user friendly error that explain the problem.
    
    (cherry picked from commit baceabd482481f553d412a6f6946bd7f11a3abe2)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  5 +-
 .../apache/pulsar/broker/PulsarServiceTest.java    | 69 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c79909..f7c789b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -868,8 +868,9 @@ public class PulsarService implements AutoCloseable {
         return this.nsService;
     }
 
-    public WorkerService getWorkerService() {
-        return functionWorkerService.orElse(null);
+    public WorkerService getWorkerService() throws UnsupportedOperationException {
+        return functionWorkerService.orElseThrow(() -> new UnsupportedOperationException("Pulsar Function Worker "
+                + "is not enabled, probably functionsWorkerEnabled is set to false"));
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
new file mode 100644
index 0000000..0ff1ca1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertSame;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+public class PulsarServiceTest {
+
+    @Test
+    public void testGetWorkerService() {
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setZookeeperServers("localhost");
+        configuration.setClusterName("clusterName");
+        configuration.setFunctionsWorkerEnabled(true);
+        WorkerService expectedWorkerService = mock(WorkerService.class);
+        PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(),
+                Optional.of(expectedWorkerService), (exitCode) -> {}));
+
+        WorkerService actualWorkerService = pulsarService.getWorkerService();
+        assertSame(expectedWorkerService, actualWorkerService);
+    }
+
+    /**
+     * Verifies that the getWorkerService throws {@link UnsupportedOperationException}
+     * when functionsWorkerEnabled is set to false .
+     */
+    @Test
+    public void testGetWorkerServiceException() throws Exception {
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setZookeeperServers("localhost");
+        configuration.setClusterName("clusterName");
+        configuration.setFunctionsWorkerEnabled(false);
+        PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(),
+                Optional.empty(), (exitCode) -> {});
+
+        String errorMessage = "Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false";
+        try {
+            pulsarService.getWorkerService();
+        } catch (UnsupportedOperationException e) {
+            assertEquals(e.getMessage(), errorMessage);
+        }
+    }
+}