You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/25 22:05:34 UTC

[GitHub] [pulsar] david-streamlio opened a new pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

david-streamlio opened a new pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727


   Fixes #9402
   
   We needed a new type of triggerer for batch sources that is triggered exactly once, e.g. a S3 source that receives CDC notifications via an SQS topic. In this scenario, the discover phase only needs to run once to setup the SQS connection, etc.
   
   Replaces PR: https://github.com/apache/pulsar/pull/9406 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#discussion_r583243589



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -217,6 +256,9 @@ void setup(Method method) throws Exception {
             if (connectorsDir.mkdir()) {
                 File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
                 Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
+                
+                file = new File(getClass().getClassLoader().getResource("pulsar-io-batch-data-generator.nar").getFile());

Review comment:
       Since you are copying the connector into the "connectors" dir, please also add a test to test submitting the builtin version of the connector.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#issuecomment-786935007


   @sijie @eolivelli   I replaced the previous PR with this one, and wanted to get your approval before proceeding. I have incorporated all of the changes you requested. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on a change in pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on a change in pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#discussion_r583327144



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -1143,6 +1178,145 @@ public void testPulsarSourceStatsWithUrl() throws Exception {
                 fileServer.getAddress().getPort());
         testPulsarSourceStats(jarFilePathUrl);
     }
+    
+    private void testPulsarBatchSourceStats(String jarFilePathUrl) throws Exception {
+    	final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sourceName = "PulsarBatchSource";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+        sourceConfig.setBatchSourceConfig(createBatchSourceConfig());
+        
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output-" + sourceName;
+        sourceConfig.setTopicName(sinkTopic2);
+        admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertNotNull(sourceStats.publishers.get(0).metadata);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic2).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic2, false).numberOfEntries > 4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
+
+        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_source_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+

Review comment:
       The readNext() method of the BatchDataGeneratorSource has a call to Thread.sleep(50; in it, so the number of messages is not determined by how long each individual test runs. Thus it is NOT deterministic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#issuecomment-786267138


   @david-streamlio I don't think you have added the ImmediateTrigger class to the PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#discussion_r583241360



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -1143,6 +1178,145 @@ public void testPulsarSourceStatsWithUrl() throws Exception {
                 fileServer.getAddress().getPort());
         testPulsarSourceStats(jarFilePathUrl);
     }
+    
+    private void testPulsarBatchSourceStats(String jarFilePathUrl) throws Exception {
+    	final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sourceName = "PulsarBatchSource";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+        sourceConfig.setBatchSourceConfig(createBatchSourceConfig());
+        
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output-" + sourceName;
+        sourceConfig.setTopicName(sinkTopic2);
+        admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertNotNull(sourceStats.publishers.get(0).metadata);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic2).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic2, false).numberOfEntries > 4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
+
+        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_source_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+

Review comment:
       Please also delete the source at the end of test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng merged pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
jerrypeng merged pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] david-streamlio commented on pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
david-streamlio commented on pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#issuecomment-786915368


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #9727: Issue-9402: [functions] Added ImmediateTrigger class and unit tests

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #9727:
URL: https://github.com/apache/pulsar/pull/9727#discussion_r583242252



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
##########
@@ -1143,6 +1178,145 @@ public void testPulsarSourceStatsWithUrl() throws Exception {
                 fileServer.getAddress().getPort());
         testPulsarSourceStats(jarFilePathUrl);
     }
+    
+    private void testPulsarBatchSourceStats(String jarFilePathUrl) throws Exception {
+    	final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sourceName = "PulsarBatchSource";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+        sourceConfig.setBatchSourceConfig(createBatchSourceConfig());
+        
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
+        final String sinkTopic2 = "persistent://" + replNamespace + "/output-" + sourceName;
+        sourceConfig.setTopicName(sinkTopic2);
+        admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic2);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertNotNull(sourceStats.publishers.get(0).metadata);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic2).publishers.size() == 1) && (admin.topics().getInternalStats(sinkTopic2, false).numberOfEntries > 4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
+
+        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        log.info("prometheusMetrics: {}", prometheusMetrics);
+
+        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
+        Metric m = metrics.get("pulsar_source_received_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_received_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_written_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_source_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_system_exceptions_total_1min");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertEquals(m.value, 0.0);
+        m = metrics.get("pulsar_source_last_invocation");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sourceName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
+        assertTrue(m.value > 0.0);
+

Review comment:
       Since you using a trigger that triggers on once, the number of messages produced by the data generator batch source should be deterministic.  Please create a reader and read from the output topic to make sure you got the correct number of messages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org