You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 13:11:39 UTC
[pulsar] 01/05: [broker] change getWorkerService method to throw
UnsupportedOperationException (#9738)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 67a4be681aee9a74da38c388da3721e1140cf9c8
Author: guodongyang <11...@qq.com>
AuthorDate: Wed Mar 3 12:13:18 2021 +0800
[broker] change getWorkerService method to throw UnsupportedOperationException (#9738)
Fix #9633
When we set `functionsWorkerEnabled=false` in `broker.conf`, broker don't support management function of `functionWorker`.
But when we try to appy those method. Broker will throw NullPointerException instead of throwing a more user friendly error that explain the problem.
(cherry picked from commit baceabd482481f553d412a6f6946bd7f11a3abe2)
---
.../org/apache/pulsar/broker/PulsarService.java | 5 +-
.../apache/pulsar/broker/PulsarServiceTest.java | 69 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c79909..f7c789b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -868,8 +868,9 @@ public class PulsarService implements AutoCloseable {
return this.nsService;
}
- public WorkerService getWorkerService() {
- return functionWorkerService.orElse(null);
+ public WorkerService getWorkerService() throws UnsupportedOperationException {
+ return functionWorkerService.orElseThrow(() -> new UnsupportedOperationException("Pulsar Function Worker "
+ + "is not enabled, probably functionsWorkerEnabled is set to false"));
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
new file mode 100644
index 0000000..0ff1ca1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertSame;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+public class PulsarServiceTest {
+
+ @Test
+ public void testGetWorkerService() {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setZookeeperServers("localhost");
+ configuration.setClusterName("clusterName");
+ configuration.setFunctionsWorkerEnabled(true);
+ WorkerService expectedWorkerService = mock(WorkerService.class);
+ PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(),
+ Optional.of(expectedWorkerService), (exitCode) -> {}));
+
+ WorkerService actualWorkerService = pulsarService.getWorkerService();
+ assertSame(expectedWorkerService, actualWorkerService);
+ }
+
+ /**
+ * Verifies that the getWorkerService throws {@link UnsupportedOperationException}
+ * when functionsWorkerEnabled is set to false .
+ */
+ @Test
+ public void testGetWorkerServiceException() throws Exception {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setZookeeperServers("localhost");
+ configuration.setClusterName("clusterName");
+ configuration.setFunctionsWorkerEnabled(false);
+ PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(),
+ Optional.empty(), (exitCode) -> {});
+
+ String errorMessage = "Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false";
+ try {
+ pulsarService.getWorkerService();
+ } catch (UnsupportedOperationException e) {
+ assertEquals(e.getMessage(), errorMessage);
+ }
+ }
+}