You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/20 06:02:36 UTC

[pulsar] branch master updated: Close reader for metadata topic during initialization (#8637)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95755fd  Close reader for metadata topic during initialization (#8637)
95755fd is described below

commit 95755fdc7e6f03e499ddb147c3f73abc6fc0c17f
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 19 22:02:05 2020 -0800

    Close reader for metadata topic during initialization (#8637)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../pulsar/functions/worker/FunctionMetaDataManager.java    |  4 ++--
 .../pulsar/functions/worker/FunctionRuntimeManager.java     | 13 +++++--------
 2 files changed, 7 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index e1fab33..dfb3a90 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -97,9 +97,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
      * We create a new reader
      */
     public synchronized void initialize() {
-        try {
+        try (Reader reader = FunctionMetaDataTopicTailer.createReader(
+          workerConfig, pulsarClient.newReader(), MessageId.earliest)){
             // read all existing messages
-            Reader reader = FunctionMetaDataTopicTailer.createReader(workerConfig, pulsarClient.newReader(), MessageId.earliest);
             while (reader.hasMessageAvailable()) {
                 processMetaDataTopicMessage(reader.readNext());
             }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 852a2b2..e359e83 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -227,12 +227,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
      * @return the message id of the message processed during init phase
      */
     public MessageId initialize() {
-        try {
-            Reader<byte[]> reader = WorkerUtils.createReader(
-                    workerService.getClient().newReader(),
-                    workerConfig.getWorkerId() + "-function-assignment-initialize",
-                    workerConfig.getFunctionAssignmentTopic(),
-                    MessageId.earliest);
+        try (Reader<byte[]> reader = WorkerUtils.createReader (
+          workerService.getClient().newReader(),
+          workerConfig.getWorkerId() + "-function-assignment-initialize",
+          workerConfig.getFunctionAssignmentTopic(),
+          MessageId.earliest)) {
 
             // start init phase
             this.isInitializePhase = true;
@@ -246,8 +245,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
             }
             // init phase is done
             this.isInitializePhase = false;
-            // close reader
-            reader.close();
             // realize existing assignments
             Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
             if (assignmentMap != null) {