You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/08/27 15:42:20 UTC

[pulsar] branch master updated: Fix pulsar sink and source state (#5046)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d6248  Fix pulsar sink and source state (#5046)
40d6248 is described below

commit 40d6248e380c2ece65d90897675be205090aa115
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Aug 27 08:42:15 2019 -0700

    Fix pulsar sink and source state (#5046)
---
 .../pulsar/functions/instance/ContextImpl.java     |  16 +-
 .../functions/instance/JavaInstanceRunnable.java   |   6 +-
 .../pulsar/functions/instance/ContextImplTest.java |  25 +-
 tests/docker-images/java-test-functions/pom.xml    |   7 +
 .../pulsar/tests/integration/io/TestStateSink.java |  51 ++++
 .../tests/integration/io/TestStateSource.java      |  54 +++++
 .../integration/functions/PulsarStateTest.java     | 257 ++++++++++++++++++++-
 7 files changed, 387 insertions(+), 29 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e5de503..eaadb38 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import io.netty.buffer.ByteBuf;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.bookkeeper.api.kv.Table;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
@@ -47,7 +50,6 @@ import org.slf4j.Logger;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -72,9 +74,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     private final SecretsProvider secretsProvider;
     private final Map<String, Object> secretsMap;
 
-    @Getter
-    @Setter
-    private StateContextImpl stateContext;
+    @VisibleForTesting
+    StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
     private ComponentStatsManager statsManager;
@@ -95,7 +96,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
-                       Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
+                       Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
+                       Table<ByteBuf, ByteBuf> stateTable) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
@@ -146,6 +148,10 @@ class ContextImpl implements Context, SinkContext, SourceContext {
                 .quantile(0.999, 0.01)
                 .register(collectorRegistry);
         this.componentType = componentType;
+
+        if (null != stateTable) {
+            this.stateContext = new StateContextImpl(stateTable);
+        }
     }
 
     public void setCurrentMessageContext(Record<?> record) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 66d4ba7..3d91916 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -213,7 +213,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
-                collectorRegistry, metricsLabels, this.componentType, this.stats);
+                collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
     }
 
     /**
@@ -232,10 +232,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     this.componentType);
 
             javaInstance = setupJavaInstance();
-            if (null != stateTable) {
-                StateContextImpl stateContext = new StateContextImpl(stateTable);
-                javaInstance.getContext().setStateContext(stateContext);
-            }
             while (true) {
                 currentRecord = readInput();
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index bfe545a..9af7a47 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.bookkeeper.api.kv.Table;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
@@ -81,13 +82,12 @@ public class ContextImplTest {
         TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
         doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
         when(producer.newMessage()).thenReturn(messageBuilder);
-
         context = new ContextImpl(
             config,
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
-                FunctionDetails.ComponentType.FUNCTION, null);
+                FunctionDetails.ComponentType.FUNCTION, null, null);
         context.setCurrentMessageContext((Record<String>) () -> null);
     }
 
@@ -98,6 +98,7 @@ public class ContextImplTest {
 
     @Test(expectedExceptions = IllegalStateException.class)
     public void testGetCounterStateDisabled() {
+
         context.getCounter("test-key");
     }
 
@@ -113,35 +114,31 @@ public class ContextImplTest {
 
     @Test
     public void testIncrCounterStateEnabled() throws Exception {
-        StateContextImpl stateContext = mock(StateContextImpl.class);
-        context.setStateContext(stateContext);
+        context.stateContext = mock(StateContextImpl.class);
         context.incrCounterAsync("test-key", 10L);
-        verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
+        verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
     }
 
     @Test
     public void testGetCounterStateEnabled() throws Exception {
-        StateContextImpl stateContext = mock(StateContextImpl.class);
-        context.setStateContext(stateContext);
+        context.stateContext = mock(StateContextImpl.class);
         context.getCounterAsync("test-key");
-        verify(stateContext, times(1)).getCounter(eq("test-key"));
+        verify(context.stateContext, times(1)).getCounter(eq("test-key"));
     }
 
     @Test
     public void testPutStateStateEnabled() throws Exception {
-        StateContextImpl stateContext = mock(StateContextImpl.class);
-        context.setStateContext(stateContext);
+        context.stateContext = mock(StateContextImpl.class);
         ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
         context.putStateAsync("test-key", buffer);
-        verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
+        verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
     }
 
     @Test
     public void testGetStateStateEnabled() throws Exception {
-        StateContextImpl stateContext = mock(StateContextImpl.class);
-        context.setStateContext(stateContext);
+        context.stateContext = mock(StateContextImpl.class);
         context.getStateAsync("test-key");
-        verify(stateContext, times(1)).get(eq("test-key"));
+        verify(context.stateContext, times(1)).get(eq("test-key"));
     }
 
     @Test
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index 1a0f42b..4889049 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -29,6 +29,13 @@
     <groupId>org.apache.pulsar.tests</groupId>
     <artifactId>java-test-functions</artifactId>
     <name>Apache Pulsar :: Tests :: Docker Images :: Java Test Functions</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
     <packaging>jar</packaging>
 
     <profiles>
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java
new file mode 100644
index 0000000..d1d0740
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSink.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class TestStateSink implements Sink<String> {
+
+    private SinkContext sinkContext;
+    private int count;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        sinkContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
+        this.sinkContext = sinkContext;
+    }
+
+    @Override
+    public void write(Record<String> record) throws Exception {
+        String initial = new String(sinkContext.getState("initial").array());
+        String val = String.format("%s-%d", initial, count);
+        sinkContext.putState("now", ByteBuffer.wrap(val.getBytes()));
+        count++;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java
new file mode 100644
index 0000000..ebbd809
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestStateSource.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class TestStateSource implements Source<String> {
+
+
+    private SourceContext sourceContext;
+    private int count;
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        sourceContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
+        this.sourceContext = sourceContext;
+    }
+
+    @Override
+    public Record<String> read() throws Exception {
+        Thread.sleep(50);
+        String initial = new String(sourceContext.getState("initial").array());
+        String val = String.format("%s-%d", initial, count);
+        sourceContext.putState("now", ByteBuffer.wrap(val.getBytes()));
+        count++;
+        return () -> val;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 264e9bc..2e386eb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -18,18 +18,20 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -38,9 +40,17 @@ import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 /**
  * State related test cases.
  */
+@Slf4j
 public class PulsarStateTest extends PulsarStandaloneTestSuite {
 
     public static final String WORDCOUNT_PYTHON_CLASS =
@@ -84,6 +94,139 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         getFunctionInfoNotFound(functionName);
     }
 
+    @Test
+    public void testSourceState() throws Exception {
+        String outputTopicName = "test-state-source-output-" + randomName(8);
+        String sourceName = "test-state-source-" + randomName(8);
+
+        submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource",  JAVAJAR);
+
+        // get source info
+        getSourceInfoSuccess(sourceName);
+
+        // get source status
+        getSourceStatus(sourceName);
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            retryStrategically((test) -> {
+                try {
+                    SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                    return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWritten > 0;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 10, 200);
+
+            SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+            assertEquals(status.getInstances().size(), 1);
+            assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
+
+            FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial");
+            assertEquals(functionState.getStringValue(), "val1");
+
+            functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
+            assertTrue(functionState.getStringValue().matches("val1-.*"));
+        }
+
+        // delete source
+        deleteSource(sourceName);
+
+        getSourceInfoNotFound(sourceName);
+    }
+
+    @Test
+    public void testSinkState() throws Exception {
+        String inputTopicName = "test-state-sink-input-" + randomName(8);
+        String sinkName = "test-state-sink-" + randomName(8);
+        int numMessages = 10;
+
+        submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink",  JAVAJAR);
+
+        // get sink info
+        getSinkInfoSuccess(sinkName);
+
+        // get sink status
+        getSinkStatus(sinkName);
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            // java supports schema
+            @Cleanup PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(container.getPlainTextServiceUrl())
+                    .build();
+            @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(inputTopicName)
+                    .create();
+
+            FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial");
+            assertEquals(functionState.getStringValue(), "val1");
+
+            for (int i = 0; i < numMessages; i++) {
+                producer.send("foo");
+            }
+
+            retryStrategically((test) -> {
+                try {
+                    SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+                    return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWrittenToSink > 0;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 10, 200);
+
+            SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
+            assertEquals(status.getInstances().size(), 1);
+            assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink > 0);
+
+            functionState = admin.functions().getFunctionState("public", "default", sinkName, "now");
+            assertEquals(functionState.getStringValue(), String.format("val1-%d", numMessages - 1));
+        }
+
+        // delete source
+        deleteSink(sinkName);
+
+        getSinkInfoNotFound(sinkName);
+    }
+
+    private void submitSourceConnector(String sourceName,
+                                         String outputTopicName,
+                                         String className,
+                                         String archive) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources", "create",
+                "--name", sourceName,
+                "--destinationTopicName", outputTopicName,
+                "--archive", archive,
+                "--classname", className
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("\"Created successfully\""),
+                result.getStdout());
+    }
+
+    private void submitSinkConnector(String sinkName,
+                                         String inputTopicName,
+                                         String className,
+                                         String archive) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks", "create",
+                "--name", sinkName,
+                "--inputs", inputTopicName,
+                "--archive", archive,
+                "--classname", className
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("\"Created successfully\""),
+                result.getStdout());
+    }
+
     private static void submitExclamationFunction(Runtime runtime,
                                                   String inputTopicName,
                                                   String outputTopicName,
@@ -151,6 +294,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         }
     }
 
+    private static void getSinkInfoSuccess(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
+    }
+
+    private static void getSourceInfoSuccess(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\""));
+    }
+
     private static void getFunctionInfoSuccess(String functionName) throws Exception {
         ContainerExecResult result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -178,6 +345,30 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         }
     }
 
+    private static void getSinkStatus(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
+    private static void getSourceStatus(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
     private static void getFunctionStatus(String functionName, int numMessages) throws Exception {
         ContainerExecResult result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -243,4 +434,60 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         assertTrue(result.getStderr().isEmpty());
     }
 
+    private static void deleteSource(String sourceName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName
+        );
+        assertTrue(result.getStdout().contains("Delete source successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+    private static void deleteSink(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        assertTrue(result.getStdout().contains("Deleted successfully"));
+        assertTrue(result.getStderr().isEmpty());
+    }
+
+    private static void getSourceInfoNotFound(String sourceName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sources",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", sourceName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
+        }
+    }
+
+    private static void getSinkInfoNotFound(String sinkName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sinks",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", sinkName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
+        }
+    }
+
 }