You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2024/01/05 03:46:27 UTC

(pulsar) branch branch-3.0 updated: [fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0c497f2aadc [fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
0c497f2aadc is described below

commit 0c497f2aadc5075deff46c2807ba855517340c18
Author: Kai Wang <kw...@apache.org>
AuthorDate: Mon Dec 25 21:22:03 2023 +0800

    [fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
    
    (cherry picked from commit 840f87c79e2f93242708dd9afe512d0f49f421a0)
---
 .../store/TableViewLoadDataStoreImpl.java          | 34 +++++++++++++---------
 .../extensions/store/LoadDataStoreTest.java        | 30 +++++++++++++++++++
 2 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index ead0a7081fd..56afbef0456 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * The load data store, base on {@link TableView <T>}.
@@ -58,40 +59,46 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
     }
 
     @Override
-    public CompletableFuture<Void> pushAsync(String key, T loadData) {
+    public synchronized CompletableFuture<Void> pushAsync(String key, T loadData) {
+        if (producer == null) {
+            return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
+        }
         return producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
     }
 
     @Override
-    public CompletableFuture<Void> removeAsync(String key) {
+    public synchronized CompletableFuture<Void> removeAsync(String key) {
+        if (producer == null) {
+            return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
+        }
         return producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
     }
 
     @Override
-    public Optional<T> get(String key) {
+    public synchronized Optional<T> get(String key) {
         validateTableViewStart();
         return Optional.ofNullable(tableView.get(key));
     }
 
     @Override
-    public void forEach(BiConsumer<String, T> action) {
+    public synchronized void forEach(BiConsumer<String, T> action) {
         validateTableViewStart();
         tableView.forEach(action);
     }
 
-    public Set<Map.Entry<String, T>> entrySet() {
+    public synchronized Set<Map.Entry<String, T>> entrySet() {
         validateTableViewStart();
         return tableView.entrySet();
     }
 
     @Override
-    public int size() {
+    public synchronized int size() {
         validateTableViewStart();
         return tableView.size();
     }
 
     @Override
-    public void closeTableView() throws IOException {
+    public synchronized void closeTableView() throws IOException {
         if (tableView != null) {
             tableView.close();
             tableView = null;
@@ -99,13 +106,13 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
     }
 
     @Override
-    public void start() throws LoadDataStoreException {
+    public synchronized void start() throws LoadDataStoreException {
         startProducer();
         startTableView();
     }
 
     @Override
-    public void startTableView() throws LoadDataStoreException {
+    public synchronized void startTableView() throws LoadDataStoreException {
         if (tableView == null) {
             try {
                 tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
@@ -117,7 +124,7 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
     }
 
     @Override
-    public void startProducer() throws LoadDataStoreException {
+    public synchronized void startProducer() throws LoadDataStoreException {
         if (producer == null) {
             try {
                 producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
@@ -129,7 +136,7 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
     }
 
     @Override
-    public void close() throws IOException {
+    public synchronized void close() throws IOException {
         if (producer != null) {
             producer.close();
             producer = null;
@@ -138,15 +145,14 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
     }
 
     @Override
-    public void init() throws IOException {
+    public synchronized void init() throws IOException {
         close();
         start();
     }
 
-    private void validateTableViewStart() {
+    private synchronized void validateTableViewStart() {
         if (tableView == null) {
             throw new IllegalStateException("table view has not been started");
         }
     }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 7431b9815f9..f486370400c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -39,6 +40,7 @@ import org.testng.annotations.Test;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 @Test(groups = "broker")
 public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@@ -154,4 +156,32 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 2));
     }
 
+    @Test
+    public void testProducerStop() throws Exception {
+        String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+        LoadDataStore<Integer> loadDataStore =
+                LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
+        loadDataStore.startProducer();
+        loadDataStore.pushAsync("1", 1).get();
+        loadDataStore.removeAsync("1").get();
+
+        loadDataStore.close();
+
+        try {
+            loadDataStore.pushAsync("2", 2).get();
+            fail();
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+        }
+        try {
+            loadDataStore.removeAsync("2").get();
+            fail();
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof IllegalStateException);
+        }
+        loadDataStore.startProducer();
+        loadDataStore.pushAsync("3", 3).get();
+        loadDataStore.removeAsync("3").get();
+    }
+
 }