You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/05 02:52:52 UTC

[pulsar] branch master updated: Attach names for all producers/readers in worker service (#7165)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7add930  Attach names for all producers/readers in worker service (#7165)
7add930 is described below

commit 7add93095a88c8acfeae214ce8a1d856a54b474d
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Jun 4 19:52:38 2020 -0700

    Attach names for all producers/readers in worker service (#7165)
    
    * Attach names for all producers/readers in worker service
    
    * Fix tests
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/functions/worker/FunctionMetaDataManager.java  | 6 +++++-
 .../apache/pulsar/functions/worker/FunctionRuntimeManager.java   | 1 +
 .../org/apache/pulsar/functions/worker/SchedulerManager.java     | 1 +
 .../apache/pulsar/functions/worker/rest/api/ComponentImpl.java   | 9 ++++++++-
 .../pulsar/functions/worker/FunctionMetaDataManagerTest.java     | 1 +
 .../pulsar/functions/worker/FunctionRuntimeManagerTest.java      | 1 +
 .../org/apache/pulsar/functions/worker/SchedulerManagerTest.java | 1 +
 7 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index c3273fa..2ab913b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -91,6 +91,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
             Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
+                    .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
                     .create();
 
             this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
@@ -432,6 +433,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
     }
 
     private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
-        return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
+        return new ServiceRequestManager(pulsarClient.newProducer()
+                .topic(functionMetadataTopic)
+                .producerName(workerConfig.getWorkerId() + "-function-metadata-manager")
+                .create());
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 075146e..90cab3f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -211,6 +211,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
         log.info("/** Initializing Runtime Manager **/");
         try {
             Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
+                    .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
                     .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
                     .startMessageId(MessageId.earliest).create();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index bcbccda..9c93443 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -106,6 +106,7 @@ public class SchedulerManager implements AutoCloseable {
                                 .blockIfQueueFull(true)
                                 .compressionType(CompressionType.LZ4)
                                 .sendTimeout(0, TimeUnit.MILLISECONDS)
+                                .producerName(config.getWorkerId() + "-scheduler-manager")
                                 .createAsync().get(10, TimeUnit.SECONDS);
                         return Actions.ActionResult.builder().success(true).result(producer).build();
                     } catch (Exception e) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0b128d9..f49971b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -994,10 +994,17 @@ public abstract class ComponentImpl {
         Producer<byte[]> producer = null;
         try {
             if (outputTopic != null && !outputTopic.isEmpty()) {
-                reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
+                reader = worker().getClient().newReader()
+                        .topic(outputTopic)
+                        .startMessageId(MessageId.latest)
+                        .readerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" +
+                                FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName))
+                        .create();
             }
             producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
                     .topic(inputTopicToWrite)
+                    .producerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" +
+                            FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName))
                     .create();
             byte[] targetArray;
             if (uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 75be623..da6852c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -48,6 +48,7 @@ public class FunctionMetaDataManagerTest {
     private static PulsarClient mockPulsarClient() throws PulsarClientException {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
 
         when(builder.create()).thenReturn(mock(Producer.class));
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 2e6ed5b..4230304 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -562,6 +562,7 @@ public class FunctionRuntimeManagerTest {
         ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 243f127..fafed1f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -110,6 +110,7 @@ public class SchedulerManagerTest {
 
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.producerName(anyString())).thenReturn(builder);
         when(builder.enableBatching(anyBoolean())).thenReturn(builder);
         when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
         when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);