You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/09/14 07:08:49 UTC
[pulsar] branch branch-2.11 updated: [fix][cli] Pulsar shell: ensure admin client is recycled or disposed (#17619)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bfcdc2dc2ce [fix][cli] Pulsar shell: ensure admin client is recycled or disposed (#17619)
bfcdc2dc2ce is described below
commit bfcdc2dc2cea7e3750027fb2e828c9363e028423
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Sep 14 09:01:20 2022 +0200
[fix][cli] Pulsar shell: ensure admin client is recycled or disposed (#17619)
* [fix][cli] Pulsar shell: ensure admin client is recycled or disposed
(cherry picked from commit 1ff9fb6b7cf502b1396dc94f91e3c01f8f18f140)
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 34 +++---
.../pulsar/admin/cli/PulsarAdminSupplier.java | 108 +++++++++++++++++++
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 116 ++++++---------------
.../java/org/apache/pulsar/shell/AdminShell.java | 2 +-
.../org/apache/pulsar/shell/AdminShellTest.java | 72 +++++++++++++
.../org/apache/pulsar/shell/PulsarShellTest.java | 28 +++--
6 files changed, 238 insertions(+), 122 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 24842b9e11a..71359f6bab8 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
+import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -2057,18 +2058,17 @@ public class PulsarAdminToolTest {
//Ok
}
- Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
- adminBuilderField.setAccessible(true);
- PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
+
+ final PulsarAdmin admin = tool.getPulsarAdminSupplier().get();
Field requestTimeoutField =
- PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeout");
+ PulsarAdminImpl.class.getDeclaredField("requestTimeout");
requestTimeoutField.setAccessible(true);
- int requestTimeout = (int) requestTimeoutField.get(builder);
+ int requestTimeout = (int) requestTimeoutField.get(admin);
Field requestTimeoutUnitField =
- PulsarAdminBuilderImpl.class.getDeclaredField("requestTimeoutUnit");
+ PulsarAdminImpl.class.getDeclaredField("requestTimeoutUnit");
requestTimeoutUnitField.setAccessible(true);
- TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(builder);
+ TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(admin);
assertEquals(1, requestTimeout);
assertEquals(TimeUnit.SECONDS, requestTimeoutUnit);
}
@@ -2094,12 +2094,8 @@ public class PulsarAdminToolTest {
}
// validate Authentication-tls has been configured
- Field adminBuilderField = PulsarAdminTool.class.getDeclaredField("adminBuilder");
- adminBuilderField.setAccessible(true);
- PulsarAdminBuilderImpl builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
- Field confField = PulsarAdminBuilderImpl.class.getDeclaredField("conf");
- confField.setAccessible(true);
- ClientConfigurationData conf = (ClientConfigurationData) confField.get(builder);
+ ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get())
+ .getClientConfigData();
AuthenticationTls atuh = (AuthenticationTls) conf.getAuthentication();
assertEquals(atuh.getCertFilePath(), certFilePath);
assertEquals(atuh.getKeyFilePath(), keyFilePath);
@@ -2112,8 +2108,8 @@ public class PulsarAdminToolTest {
// Ok
}
- builder = (PulsarAdminBuilderImpl) adminBuilderField.get(tool);
- conf = (ClientConfigurationData) confField.get(builder);
+ conf = conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get())
+ .getClientConfigData();
atuh = (AuthenticationTls) conf.getAuthentication();
assertNull(atuh.getCertFilePath());
assertNull(atuh.getKeyFilePath());
@@ -2330,12 +2326,8 @@ public class PulsarAdminToolTest {
properties.put("webServiceUrl", "http://localhost:2181");
properties.put("cliExtensionsDirectory", narFile.getParentFile().getAbsolutePath());
properties.put("customCommandFactories", "dummy");
- PulsarAdminTool tool = new PulsarAdminTool(properties) {
- @Override
- protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
- return builder;
- }
- };
+ PulsarAdminTool tool = new PulsarAdminTool(properties);
+ tool.setPulsarAdminSupplier(new PulsarAdminSupplier(builder, tool.getRootParams()));
// see the custom command help in the main help
StringBuilder logs = new StringBuilder();
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
new file mode 100644
index 00000000000..42db04ad754
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Data;
+import lombok.SneakyThrows;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+
+public class PulsarAdminSupplier implements Supplier<PulsarAdmin> {
+
+ @Data
+ private static final class RootParamsKey {
+ String serviceUrl;
+ String authPluginClassName;
+ int requestTimeout;
+ String authParams;
+ Boolean tlsAllowInsecureConnection;
+ String tlsTrustCertsFilePath;
+ Boolean tlsEnableHostnameVerification;
+ String tlsProvider;
+
+ static RootParamsKey fromRootParams(PulsarAdminTool.RootParams params) {
+ final RootParamsKey key = new RootParamsKey();
+ key.setServiceUrl(params.getServiceUrl());
+ key.setAuthParams(params.getAuthParams());
+ key.setAuthPluginClassName(params.getAuthPluginClassName());
+ key.setRequestTimeout(params.getRequestTimeout());
+ key.setTlsAllowInsecureConnection(params.getTlsAllowInsecureConnection());
+ key.setTlsTrustCertsFilePath(params.getTlsTrustCertsFilePath());
+ key.setTlsEnableHostnameVerification(params.getTlsEnableHostnameVerification());
+ key.setTlsProvider(params.getTlsProvider());
+ return key;
+ }
+ }
+
+ private final PulsarAdminBuilder adminBuilder;
+ private RootParamsKey currentParamsKey;
+ private PulsarAdmin admin;
+
+ public PulsarAdminSupplier(PulsarAdminBuilder baseAdminBuilder, PulsarAdminTool.RootParams rootParams) {
+ this.adminBuilder = baseAdminBuilder;
+ rootParamsUpdated(rootParams);
+ }
+
+ @Override
+ public PulsarAdmin get() {
+ if (admin == null) {
+ try {
+ admin = adminBuilder.build();
+ } catch (Exception ex) {
+ System.err.println(ex.getClass() + ": " + ex.getMessage());
+ throw new RuntimeException("Not able to create pulsar admin: " + ex.getMessage(), ex);
+ }
+ }
+ return admin;
+ }
+
+ void rootParamsUpdated(PulsarAdminTool.RootParams newParams) {
+ final RootParamsKey newParamsKey = RootParamsKey.fromRootParams(newParams);
+ if (newParamsKey.equals(currentParamsKey)) {
+ return;
+ }
+ applyRootParamsToAdminBuilder(adminBuilder, newParams);
+ currentParamsKey = newParamsKey;
+ if (admin != null) {
+ admin.close();
+ }
+ this.admin = null;
+ }
+
+ @SneakyThrows
+ private static void applyRootParamsToAdminBuilder(PulsarAdminBuilder adminBuilder,
+ PulsarAdminTool.RootParams rootParams) {
+ adminBuilder.serviceHttpUrl(rootParams.serviceUrl);
+ adminBuilder.authentication(rootParams.authPluginClassName, rootParams.authParams);
+ adminBuilder.requestTimeout(rootParams.requestTimeout, TimeUnit.SECONDS);
+ if (rootParams.tlsAllowInsecureConnection != null) {
+ adminBuilder.allowTlsInsecureConnection(rootParams.tlsAllowInsecureConnection);
+ }
+ if (rootParams.tlsEnableHostnameVerification != null) {
+ adminBuilder.enableTlsHostnameVerification(rootParams.tlsEnableHostnameVerification);
+ }
+ if (isNotBlank(rootParams.tlsProvider)) {
+ adminBuilder.sslProvider(rootParams.tlsProvider);
+ }
+ }
+
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index f62819ac640..567b7f01f07 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -31,8 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
@@ -53,9 +51,9 @@ public class PulsarAdminTool {
protected List<CustomCommandFactory> customCommandFactories = new ArrayList();
protected Map<String, Class<?>> commandMap;
protected JCommander jcommander;
- protected final PulsarAdminBuilder adminBuilder;
protected RootParams rootParams;
private final Properties properties;
+ private PulsarAdminSupplier pulsarAdminSupplier;
@Getter
public static class RootParams {
@@ -105,11 +103,13 @@ public class PulsarAdminTool {
rootParams = new RootParams();
// fallback to previous-version serviceUrl property to maintain backward-compatibility
initRootParamsFromProperties(properties);
- adminBuilder = createAdminBuilder(properties);
+ final PulsarAdminBuilder baseAdminBuilder = createAdminBuilderFromProperties(properties);
+ pulsarAdminSupplier = new PulsarAdminSupplier(baseAdminBuilder, rootParams);
initJCommander();
}
- protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
+
+ private static PulsarAdminBuilder createAdminBuilderFromProperties(Properties properties) {
boolean useKeyStoreTls = Boolean
.parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
String tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
@@ -121,16 +121,12 @@ public class PulsarAdminTool {
String tlsKeyFilePath = properties.getProperty("tlsKeyFilePath");
String tlsCertificateFilePath = properties.getProperty("tlsCertificateFilePath");
- boolean tlsAllowInsecureConnection = this.rootParams.tlsAllowInsecureConnection != null
- ? this.rootParams.tlsAllowInsecureConnection
- : Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
+ boolean tlsAllowInsecureConnection = Boolean.parseBoolean(properties
+ .getProperty("tlsAllowInsecureConnection", "false"));
- boolean tlsEnableHostnameVerification = this.rootParams.tlsEnableHostnameVerification != null
- ? this.rootParams.tlsEnableHostnameVerification
- : Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
- final String tlsTrustCertsFilePath = isNotBlank(this.rootParams.tlsTrustCertsFilePath)
- ? this.rootParams.tlsTrustCertsFilePath
- : properties.getProperty("tlsTrustCertsFilePath");
+ boolean tlsEnableHostnameVerification = Boolean.parseBoolean(properties
+ .getProperty("tlsEnableHostnameVerification", "false"));
+ final String tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
return PulsarAdmin.builder().allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsEnableHostnameVerification)
@@ -152,39 +148,19 @@ public class PulsarAdminTool {
: properties.getProperty("serviceUrl");
rootParams.authPluginClassName = properties.getProperty("authPlugin");
rootParams.authParams = properties.getProperty("authParams");
+ rootParams.tlsProvider = properties.getProperty("webserviceTlsProvider");
}
- private static class PulsarAdminSupplier implements Supplier<PulsarAdmin> {
-
- private final PulsarAdminBuilder pulsarAdminBuilder;
- private final Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory;
- private PulsarAdmin admin;
- private PulsarAdminSupplier(PulsarAdminBuilder pulsarAdminBuilder,
- Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
- this.pulsarAdminBuilder = pulsarAdminBuilder;
- this.adminFactory = adminFactory;
- }
-
- @Override
- public PulsarAdmin get() {
- if (admin == null) {
- admin = adminFactory.apply(pulsarAdminBuilder);
- }
- return admin;
- }
- }
-
- public void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
+ public void setupCommands() {
try {
- Supplier<PulsarAdmin> admin = new PulsarAdminSupplier(adminBuilder, adminFactory);
for (Map.Entry<String, Class<?>> c : commandMap.entrySet()) {
- addCommand(c, admin);
+ addCommand(c, pulsarAdminSupplier);
}
CommandExecutionContext context = new CommandExecutionContext() {
@Override
public PulsarAdmin getPulsarAdmin() {
- return admin.get();
+ return pulsarAdminSupplier.get();
}
@Override
@@ -197,7 +173,7 @@ public class PulsarAdminTool {
for (CustomCommandFactory factory : customCommandFactories) {
List<CustomCommandGroup> customCommandGroups = factory.commandGroups(context);
for (CustomCommandGroup group : customCommandGroups) {
- Object generated = CustomCommandsUtils.generateCliCommand(group, context, admin);
+ Object generated = CustomCommandsUtils.generateCliCommand(group, context, pulsarAdminSupplier);
jcommander.addCommand(group.name(), generated);
commandMap.put(group.name(), null);
}
@@ -215,7 +191,7 @@ public class PulsarAdminTool {
}
private void loadCustomCommandFactories() throws Exception {
- customCommandFactories.addAll(CustomCommandFactoryProvider.createCustomCommandFactories(properties));
+ customCommandFactories = CustomCommandFactoryProvider.createCustomCommandFactories(properties);
}
@@ -237,12 +213,8 @@ public class PulsarAdminTool {
}
protected boolean run(String[] args) {
- final Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory = createAdminFactory(args);
- return run(args, adminFactory);
- }
+ setupCommands();
- boolean run(String[] args, Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
- setupCommands(adminFactory);
if (args.length == 0) {
jcommander.usage();
return false;
@@ -257,17 +229,8 @@ public class PulsarAdminTool {
try {
jcommander.parse(Arrays.copyOfRange(args, 0, Math.min(cmdPos, args.length)));
-
//rootParams are populated by jcommander.parse
- adminBuilder.serviceHttpUrl(rootParams.serviceUrl);
- adminBuilder.authentication(rootParams.authPluginClassName, rootParams.authParams);
- adminBuilder.requestTimeout(rootParams.requestTimeout, TimeUnit.SECONDS);
- if (isBlank(rootParams.tlsProvider)) {
- rootParams.tlsProvider = properties.getProperty("webserviceTlsProvider");
- }
- if (isNotBlank(rootParams.tlsProvider)) {
- adminBuilder.sslProvider(rootParams.tlsProvider);
- }
+ pulsarAdminSupplier.rootParamsUpdated(rootParams);
} catch (Exception e) {
System.err.println(e.getMessage());
System.err.println();
@@ -348,35 +311,6 @@ public class PulsarAdminTool {
}
}
- private Function<PulsarAdminBuilder, ? extends PulsarAdmin> createAdminFactory(String[] args) {
- int cmdPos;
- for (cmdPos = 0; cmdPos < args.length; cmdPos++) {
- if (commandMap.containsKey(args[cmdPos])) {
- break;
- }
- }
-
- ++cmdPos;
- boolean isLocalRun = cmdPos < args.length && "localrun".equalsIgnoreCase(args[cmdPos]);
-
- Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory;
- if (isLocalRun) {
- // bypass constructing admin client
- adminFactory = (adminBuilder) -> null;
- } else {
- adminFactory = (adminBuilder) -> {
- try {
- return adminBuilder.build();
- } catch (Exception ex) {
- System.err.println(ex.getClass() + ": " + ex.getMessage());
- exit(1);
- return null;
- }
- };
- }
- return adminFactory;
- }
-
static void setAllowSystemExit(boolean allowSystemExit) {
PulsarAdminTool.allowSystemExit = allowSystemExit;
}
@@ -434,4 +368,18 @@ public class PulsarAdminTool {
commandMap.put("transactions", CmdTransactions.class);
}
+ @VisibleForTesting
+ public void setPulsarAdminSupplier(PulsarAdminSupplier pulsarAdminSupplier) {
+ this.pulsarAdminSupplier = pulsarAdminSupplier;
+ }
+
+ @VisibleForTesting
+ public PulsarAdminSupplier getPulsarAdminSupplier() {
+ return pulsarAdminSupplier;
+ }
+
+ @VisibleForTesting
+ public RootParams getRootParams() {
+ return rootParams;
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
index 67eb0cc604e..b5a4247c695 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/AdminShell.java
@@ -51,7 +51,7 @@ public class AdminShell extends PulsarAdminTool implements ShellCommandsProvider
@Override
public void setupState(Properties properties) {
getJCommander().setProgramName(getName());
- setupCommands(b -> null);
+ setupCommands();
}
@Override
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/AdminShellTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/AdminShellTest.java
new file mode 100644
index 00000000000..33d66bba99e
--- /dev/null
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/AdminShellTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.shell;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import java.util.Properties;
+import org.apache.pulsar.admin.cli.PulsarAdminSupplier;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.Topics;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class AdminShellTest {
+
+ private final Properties props = new Properties();
+ private AdminShell adminShell;
+
+ @BeforeMethod
+ public void before() throws Exception {
+ props.setProperty("webServiceUrl", "http://localhost:8080");
+ adminShell = new AdminShell(props);
+ }
+
+ @Test
+ public void test() throws Exception {
+ final PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
+ final PulsarAdmin admin = mock(PulsarAdmin.class);
+ when(builder.build()).thenReturn(admin);
+ when(admin.topics()).thenReturn(mock(Topics.class));
+ adminShell.setPulsarAdminSupplier(new PulsarAdminSupplier(builder, adminShell.getRootParams()));
+ assertTrue(run(new String[]{"topics", "list", "public/default"}));
+ verify(builder).build();
+ assertTrue(run(new String[]{"topics", "list", "public/default"}));
+ verify(builder).build();
+ assertTrue(run(new String[]{"--admin-url", "http://localhost:8081",
+ "topics", "list", "public/default"}));
+ assertTrue(run(new String[]{"topics", "list", "public/default"}));
+ verify(builder, times(3)).build();
+ assertTrue(run(new String[]{"--admin-url", "http://localhost:8080",
+ "topics", "list", "public/default"}));
+ verify(builder, times(3)).build();
+ }
+
+ private boolean run(String[] args) throws Exception {
+ try {
+ return adminShell.runCommand(args);
+ } finally {
+ adminShell.cleanupState(props);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
index 1102845ada2..de6d77fe23e 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/shell/PulsarShellTest.java
@@ -36,8 +36,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
+import org.apache.pulsar.admin.cli.PulsarAdminSupplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.cli.CmdProduce;
import org.jline.reader.EndOfFileException;
@@ -55,7 +55,6 @@ public class PulsarShellTest {
private static final Logger log = LoggerFactory.getLogger(PulsarShellTest.class);
- private PulsarAdminBuilder pulsarAdminBuilder;
private PulsarAdmin pulsarAdmin;
private Topics topics;
@@ -89,23 +88,22 @@ public class PulsarShellTest {
private static class TestPulsarShell extends PulsarShell {
- private final PulsarAdminBuilder pulsarAdminBuilder;
+ private final PulsarAdmin pulsarAdmin;
AtomicReference<CmdProduce> cmdProduceHolder = new AtomicReference<>();
Integer exitCode;
- public TestPulsarShell(String[] args, Properties props, PulsarAdminBuilder pulsarAdminBuilder) throws IOException {
+ public TestPulsarShell(String[] args, Properties props, PulsarAdmin pulsarAdmin) throws IOException {
super(args, props);
- this.pulsarAdminBuilder = pulsarAdminBuilder;
+ this.pulsarAdmin = pulsarAdmin;
}
@Override
protected AdminShell createAdminShell(Properties properties) throws Exception {
- return new AdminShell(properties) {
- @Override
- protected PulsarAdminBuilder createAdminBuilder(Properties properties) {
- return pulsarAdminBuilder;
- }
- };
+ final AdminShell adminShell = new AdminShell(properties);
+ final PulsarAdminSupplier supplier = mock(PulsarAdminSupplier.class);
+ when(supplier.get()).thenReturn(pulsarAdmin);
+ adminShell.setPulsarAdminSupplier(supplier);
+ return adminShell;
}
@Override
@@ -136,9 +134,7 @@ public class PulsarShellTest {
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
- pulsarAdminBuilder = mock(PulsarAdminBuilder.class);
pulsarAdmin = mock(PulsarAdmin.class);
- when(pulsarAdminBuilder.build()).thenReturn(pulsarAdmin);
topics = mock(Topics.class);
when(pulsarAdmin.topics()).thenReturn(topics);
}
@@ -154,7 +150,7 @@ public class PulsarShellTest {
linereader.addCmd("admin topics create my-topic --metadata a=b ");
linereader.addCmd("client produce -m msg my-topic");
linereader.addCmd("quit");
- final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{}, props, pulsarAdminBuilder);
+ final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{}, props, pulsarAdmin);
testPulsarShell.run((a) -> linereader, (a) -> terminal);
verify(topics).createNonPartitionedTopic(eq("persistent://public/default/my-topic"), any(Map.class));
verify(testPulsarShell.cmdProduceHolder.get()).run();
@@ -173,7 +169,7 @@ public class PulsarShellTest {
.getContextClassLoader().getResource("test-shell-file").getFile();
final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{"-f", shellFile},
- props, pulsarAdminBuilder);
+ props, pulsarAdmin);
testPulsarShell.run((a) -> linereader, (a) -> terminal);
verify(topics).createNonPartitionedTopic(eq("persistent://public/default/my-topic"), any(Map.class));
verify(testPulsarShell.cmdProduceHolder.get()).run();
@@ -190,7 +186,7 @@ public class PulsarShellTest {
.getContextClassLoader().getResource("test-shell-file-error").getFile();
final TestPulsarShell testPulsarShell = new TestPulsarShell(new String[]{"-f", shellFile, "--fail-on-error"},
- props, pulsarAdminBuilder);
+ props, pulsarAdmin);
try {
testPulsarShell.run((a) -> linereader, (a) -> terminal);
fail();