You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/06/05 02:52:52 UTC
[pulsar] branch master updated: Attach names for all
producers/readers in worker service (#7165)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7add930 Attach names for all producers/readers in worker service (#7165)
7add930 is described below
commit 7add93095a88c8acfeae214ce8a1d856a54b474d
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Jun 4 19:52:38 2020 -0700
Attach names for all producers/readers in worker service (#7165)
* Attach names for all producers/readers in worker service
* Fix tests
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/functions/worker/FunctionMetaDataManager.java | 6 +++++-
.../apache/pulsar/functions/worker/FunctionRuntimeManager.java | 1 +
.../org/apache/pulsar/functions/worker/SchedulerManager.java | 1 +
.../apache/pulsar/functions/worker/rest/api/ComponentImpl.java | 9 ++++++++-
.../pulsar/functions/worker/FunctionMetaDataManagerTest.java | 1 +
.../pulsar/functions/worker/FunctionRuntimeManagerTest.java | 1 +
.../org/apache/pulsar/functions/worker/SchedulerManagerTest.java | 1 +
7 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index c3273fa..2ab913b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -91,6 +91,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
+ .readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
.create();
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
@@ -432,6 +433,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
}
private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
- return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
+ return new ServiceRequestManager(pulsarClient.newProducer()
+ .topic(functionMetadataTopic)
+ .producerName(workerConfig.getWorkerId() + "-function-metadata-manager")
+ .create());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 075146e..90cab3f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -211,6 +211,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
log.info("/** Initializing Runtime Manager **/");
try {
Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
+ .readerName(workerConfig.getWorkerId() + "-function-runtime-manager")
.topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
.startMessageId(MessageId.earliest).create();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index bcbccda..9c93443 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -106,6 +106,7 @@ public class SchedulerManager implements AutoCloseable {
.blockIfQueueFull(true)
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.MILLISECONDS)
+ .producerName(config.getWorkerId() + "-scheduler-manager")
.createAsync().get(10, TimeUnit.SECONDS);
return Actions.ActionResult.builder().success(true).result(producer).build();
} catch (Exception e) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 0b128d9..f49971b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -994,10 +994,17 @@ public abstract class ComponentImpl {
Producer<byte[]> producer = null;
try {
if (outputTopic != null && !outputTopic.isEmpty()) {
- reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
+ reader = worker().getClient().newReader()
+ .topic(outputTopic)
+ .startMessageId(MessageId.latest)
+ .readerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" +
+ FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName))
+ .create();
}
producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(inputTopicToWrite)
+ .producerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" +
+ FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName))
.create();
byte[] targetArray;
if (uploadedInputStream != null) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 75be623..da6852c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -48,6 +48,7 @@ public class FunctionMetaDataManagerTest {
private static PulsarClient mockPulsarClient() throws PulsarClientException {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.producerName(anyString())).thenReturn(builder);
when(builder.create()).thenReturn(mock(Producer.class));
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 2e6ed5b..4230304 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -562,6 +562,7 @@ public class FunctionRuntimeManagerTest {
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+ doReturn(readerBuilder).when(readerBuilder).readerName(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 243f127..fafed1f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -110,6 +110,7 @@ public class SchedulerManagerTest {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.producerName(anyString())).thenReturn(builder);
when(builder.enableBatching(anyBoolean())).thenReturn(builder);
when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);