You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/08/27 15:42:20 UTC
[pulsar] branch master updated: Fix pulsar sink and source state
(#5046)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 40d6248 Fix pulsar sink and source state (#5046)
40d6248 is described below
commit 40d6248e380c2ece65d90897675be205090aa115
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Aug 27 08:42:15 2019 -0700
Fix pulsar sink and source state (#5046)
---
.../pulsar/functions/instance/ContextImpl.java | 16 +-
.../functions/instance/JavaInstanceRunnable.java | 6 +-
.../pulsar/functions/instance/ContextImplTest.java | 25 +-
tests/docker-images/java-test-functions/pom.xml | 7 +
.../pulsar/tests/integration/io/TestStateSink.java | 51 ++++
.../tests/integration/io/TestStateSource.java | 54 +++++
.../integration/functions/PulsarStateTest.java | 257 ++++++++++++++++++++-
7 files changed, 387 insertions(+), 29 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e5de503..eaadb38 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.functions.instance;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
+import org.apache.bookkeeper.api.kv.Table;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
@@ -47,7 +50,6 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
@@ -72,9 +74,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final SecretsProvider secretsProvider;
private final Map<String, Object> secretsMap;
- @Getter
- @Setter
- private StateContextImpl stateContext;
+ @VisibleForTesting
+ StateContextImpl stateContext;
private Map<String, Object> userConfigs;
private ComponentStatsManager statsManager;
@@ -95,7 +96,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
- Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
+ Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
+ Table<ByteBuf, ByteBuf> stateTable) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
@@ -146,6 +148,10 @@ class ContextImpl implements Context, SinkContext, SourceContext {
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.componentType = componentType;
+
+ if (null != stateTable) {
+ this.stateContext = new StateContextImpl(stateTable);
+ }
}
public void setCurrentMessageContext(Record<?> record) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 66d4ba7..3d91916 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -213,7 +213,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
- collectorRegistry, metricsLabels, this.componentType, this.stats);
+ collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
}
/**
@@ -232,10 +232,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.componentType);
javaInstance = setupJavaInstance();
- if (null != stateTable) {
- StateContextImpl stateContext = new StateContextImpl(stateTable);
- javaInstance.getContext().setStateContext(stateContext);
- }
while (true) {
currentRecord = readInput();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index bfe545a..9af7a47 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -81,13 +82,12 @@ public class ContextImplTest {
TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);
-
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
- FunctionDetails.ComponentType.FUNCTION, null);
+ FunctionDetails.ComponentType.FUNCTION, null, null);
context.setCurrentMessageContext((Record<String>) () -> null);
}
@@ -98,6 +98,7 @@ public class ContextImplTest {
@Test(expectedExceptions = IllegalStateException.class)
public void testGetCounterStateDisabled() {
+
context.getCounter("test-key");
}
@@ -113,35 +114,31 @@ public class ContextImplTest {
@Test
public void testIncrCounterStateEnabled() throws Exception {
- StateContextImpl stateContext = mock(StateContextImpl.class);
- context.setStateContext(stateContext);
+ context.stateContext = mock(StateContextImpl.class);
context.incrCounterAsync("test-key", 10L);
- verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
+ verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
}
@Test
public void testGetCounterStateEnabled() throws Exception {
- StateContextImpl stateContext = mock(StateContextImpl.class);
- context.setStateContext(stateContext);
+ context.stateContext = mock(StateContextImpl.class);
context.getCounterAsync("test-key");
- verify(stateContext, times(1)).getCounter(eq("test-key"));
+ verify(context.stateContext, times(1)).getCounter(eq("test-key"));
}
@Test
public void testPutStateStateEnabled() throws Exception {
- StateContextImpl stateContext = mock(StateContextImpl.class);
- context.setStateContext(stateContext);
+ context.stateContext = mock(StateContextImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
context.putStateAsync("test-key", buffer);
- verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
+ verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
}
@Test
public void testGetStateStateEnabled() throws Exception {
- StateContextImpl stateContext = mock(StateContextImpl.class);
- context.setStateContext(stateContext);
+ context.stateContext = mock(StateContextImpl.class);
context.getStateAsync("test-key");
- verify(stateContext, times(1)).get(eq("test-key"));
+ verify(context.stateContext, times(1)).get(eq("test-key"));
}
@Test
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index 1a0f42b..4889049 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -29,6 +29,13 @@
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>java-test-functions</artifactId>
<name>Apache Pulsar :: Tests :: Docker Images :: Java Test Functions</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
<packaging>jar</packaging>
<profiles>
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java
new file mode 100644
index 0000000..d1d0740
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class TestStateSink implements Sink<String> {
+
+ private SinkContext sinkContext;
+ private int count;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ sinkContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
+ this.sinkContext = sinkContext;
+ }
+
+ @Override
+ public void write(Record<String> record) throws Exception {
+ String initial = new String(sinkContext.getState("initial").array());
+ String val = String.format("%s-%d", initial, count);
+ sinkContext.putState("now", ByteBuffer.wrap(val.getBytes()));
+ count++;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java
new file mode 100644
index 0000000..ebbd809
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class TestStateSource implements Source<String> {
+
+
+ private SourceContext sourceContext;
+ private int count;
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ sourceContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
+ this.sourceContext = sourceContext;
+ }
+
+ @Override
+ public Record<String> read() throws Exception {
+ Thread.sleep(50);
+ String initial = new String(sourceContext.getState("initial").array());
+ String val = String.format("%s-%d", initial, count);
+ sourceContext.putState("now", ByteBuffer.wrap(val.getBytes()));
+ count++;
+ return () -> val;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 264e9bc..2e386eb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -18,18 +18,20 @@
*/
package org.apache.pulsar.tests.integration.functions;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -38,9 +40,17 @@ import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
/**
* State related test cases.
*/
+@Slf4j
public class PulsarStateTest extends PulsarStandaloneTestSuite {
public static final String WORDCOUNT_PYTHON_CLASS =
@@ -84,6 +94,139 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
getFunctionInfoNotFound(functionName);
}
+ @Test
+ public void testSourceState() throws Exception {
+ String outputTopicName = "test-state-source-output-" + randomName(8);
+ String sourceName = "test-state-source-" + randomName(8);
+
+ submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource", JAVAJAR);
+
+ // get source info
+ getSourceInfoSuccess(sourceName);
+
+ // get source status
+ getSourceStatus(sourceName);
+
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ retryStrategically((test) -> {
+ try {
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWritten > 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
+
+ FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial");
+ assertEquals(functionState.getStringValue(), "val1");
+
+ functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
+ assertTrue(functionState.getStringValue().matches("val1-.*"));
+ }
+
+ // delete source
+ deleteSource(sourceName);
+
+ getSourceInfoNotFound(sourceName);
+ }
+
+ @Test
+ public void testSinkState() throws Exception {
+ String inputTopicName = "test-state-sink-input-" + randomName(8);
+ String sinkName = "test-state-sink-" + randomName(8);
+ int numMessages = 10;
+
+ submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR);
+
+ // get sink info
+ getSinkInfoSuccess(sinkName);
+
+ // get sink status
+ getSinkStatus(sinkName);
+
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ // java supports schema
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(container.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopicName)
+ .create();
+
+ FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial");
+ assertEquals(functionState.getStringValue(), "val1");
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("foo");
+ }
+
+ retryStrategically((test) -> {
+ try {
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWrittenToSink > 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 10, 200);
+
+ SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink > 0);
+
+ functionState = admin.functions().getFunctionState("public", "default", sinkName, "now");
+ assertEquals(functionState.getStringValue(), String.format("val1-%d", numMessages - 1));
+ }
+
+ // delete source
+ deleteSink(sinkName);
+
+ getSinkInfoNotFound(sinkName);
+ }
+
+ private void submitSourceConnector(String sourceName,
+ String outputTopicName,
+ String className,
+ String archive) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources", "create",
+ "--name", sourceName,
+ "--destinationTopicName", outputTopicName,
+ "--archive", archive,
+ "--classname", className
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
+ private void submitSinkConnector(String sinkName,
+ String inputTopicName,
+ String className,
+ String archive) throws Exception {
+ String[] commands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks", "create",
+ "--name", sinkName,
+ "--inputs", inputTopicName,
+ "--archive", archive,
+ "--classname", className
+ };
+ log.info("Run command : {}", StringUtils.join(commands, ' '));
+ ContainerExecResult result = container.execCmd(commands);
+ assertTrue(
+ result.getStdout().contains("\"Created successfully\""),
+ result.getStdout());
+ }
+
private static void submitExclamationFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
@@ -151,6 +294,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
}
}
+ private static void getSinkInfoSuccess(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
+ }
+
+ private static void getSourceInfoSuccess(String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\""));
+ }
+
private static void getFunctionInfoSuccess(String functionName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
@@ -178,6 +345,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
}
}
+ private static void getSinkStatus(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
+ private static void getSourceStatus(String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "status",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("\"running\" : true"));
+ }
+
private static void getFunctionStatus(String functionName, int numMessages) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
@@ -243,4 +434,60 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
assertTrue(result.getStderr().isEmpty());
}
+ private static void deleteSource(String sourceName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName
+ );
+ assertTrue(result.getStdout().contains("Delete source successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
+ private static void deleteSink(String sinkName) throws Exception {
+ ContainerExecResult result = container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "delete",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName
+ );
+ assertTrue(result.getStdout().contains("Deleted successfully"));
+ assertTrue(result.getStderr().isEmpty());
+ }
+
+ private static void getSourceInfoNotFound(String sourceName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sources",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sourceName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
+ }
+ }
+
+ private static void getSinkInfoNotFound(String sinkName) throws Exception {
+ try {
+ container.execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "sinks",
+ "get",
+ "--tenant", "public",
+ "--namespace", "default",
+ "--name", sinkName);
+ fail("Command should have exited with non-zero");
+ } catch (ContainerExecException e) {
+ assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
+ }
+ }
+
}