You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/11/20 06:02:36 UTC
[pulsar] branch master updated: Close reader for metadata topic
during initialization (#8637)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95755fd Close reader for metadata topic during initialization (#8637)
95755fd is described below
commit 95755fdc7e6f03e499ddb147c3f73abc6fc0c17f
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 19 22:02:05 2020 -0800
Close reader for metadata topic during initialization (#8637)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../pulsar/functions/worker/FunctionMetaDataManager.java | 4 ++--
.../pulsar/functions/worker/FunctionRuntimeManager.java | 13 +++++--------
2 files changed, 7 insertions(+), 10 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index e1fab33..dfb3a90 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -97,9 +97,9 @@ public class FunctionMetaDataManager implements AutoCloseable {
* We create a new reader
*/
public synchronized void initialize() {
- try {
+ try (Reader reader = FunctionMetaDataTopicTailer.createReader(
+ workerConfig, pulsarClient.newReader(), MessageId.earliest)){
// read all existing messages
- Reader reader = FunctionMetaDataTopicTailer.createReader(workerConfig, pulsarClient.newReader(), MessageId.earliest);
while (reader.hasMessageAvailable()) {
processMetaDataTopicMessage(reader.readNext());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 852a2b2..e359e83 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -227,12 +227,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
* @return the message id of the message processed during init phase
*/
public MessageId initialize() {
- try {
- Reader<byte[]> reader = WorkerUtils.createReader(
- workerService.getClient().newReader(),
- workerConfig.getWorkerId() + "-function-assignment-initialize",
- workerConfig.getFunctionAssignmentTopic(),
- MessageId.earliest);
+ try (Reader<byte[]> reader = WorkerUtils.createReader (
+ workerService.getClient().newReader(),
+ workerConfig.getWorkerId() + "-function-assignment-initialize",
+ workerConfig.getFunctionAssignmentTopic(),
+ MessageId.earliest)) {
// start init phase
this.isInitializePhase = true;
@@ -246,8 +245,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
// init phase is done
this.isInitializePhase = false;
- // close reader
- reader.close();
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {