You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2024/01/05 03:46:27 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0c497f2aadc [fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
0c497f2aadc is described below
commit 0c497f2aadc5075deff46c2807ba855517340c18
Author: Kai Wang <kw...@apache.org>
AuthorDate: Mon Dec 25 21:22:03 2023 +0800
[fix][broker] Fix TableViewLoadDataStoreImpl NPE (#21777)
(cherry picked from commit 840f87c79e2f93242708dd9afe512d0f49f421a0)
---
.../store/TableViewLoadDataStoreImpl.java | 34 +++++++++++++---------
.../extensions/store/LoadDataStoreTest.java | 30 +++++++++++++++++++
2 files changed, 50 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index ead0a7081fd..56afbef0456 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.FutureUtil;
/**
* The load data store, base on {@link TableView <T>}.
@@ -58,40 +59,46 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
@Override
- public CompletableFuture<Void> pushAsync(String key, T loadData) {
+ public synchronized CompletableFuture<Void> pushAsync(String key, T loadData) {
+ if (producer == null) {
+ return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
+ }
return producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
}
@Override
- public CompletableFuture<Void> removeAsync(String key) {
+ public synchronized CompletableFuture<Void> removeAsync(String key) {
+ if (producer == null) {
+ return FutureUtil.failedFuture(new IllegalStateException("producer has not been started"));
+ }
return producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
}
@Override
- public Optional<T> get(String key) {
+ public synchronized Optional<T> get(String key) {
validateTableViewStart();
return Optional.ofNullable(tableView.get(key));
}
@Override
- public void forEach(BiConsumer<String, T> action) {
+ public synchronized void forEach(BiConsumer<String, T> action) {
validateTableViewStart();
tableView.forEach(action);
}
- public Set<Map.Entry<String, T>> entrySet() {
+ public synchronized Set<Map.Entry<String, T>> entrySet() {
validateTableViewStart();
return tableView.entrySet();
}
@Override
- public int size() {
+ public synchronized int size() {
validateTableViewStart();
return tableView.size();
}
@Override
- public void closeTableView() throws IOException {
+ public synchronized void closeTableView() throws IOException {
if (tableView != null) {
tableView.close();
tableView = null;
@@ -99,13 +106,13 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
@Override
- public void start() throws LoadDataStoreException {
+ public synchronized void start() throws LoadDataStoreException {
startProducer();
startTableView();
}
@Override
- public void startTableView() throws LoadDataStoreException {
+ public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
@@ -117,7 +124,7 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
@Override
- public void startProducer() throws LoadDataStoreException {
+ public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
@@ -129,7 +136,7 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
@Override
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
if (producer != null) {
producer.close();
producer = null;
@@ -138,15 +145,14 @@ public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
}
@Override
- public void init() throws IOException {
+ public synchronized void init() throws IOException {
close();
start();
}
- private void validateTableViewStart() {
+ private synchronized void validateTableViewStart() {
if (tableView == null) {
throw new IllegalStateException("table view has not been started");
}
}
-
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index 7431b9815f9..f486370400c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertTrue;
import com.google.common.collect.Sets;
@@ -39,6 +40,7 @@ import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
@Test(groups = "broker")
public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@@ -154,4 +156,32 @@ public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 2));
}
+ @Test
+ public void testProducerStop() throws Exception {
+ String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
+ LoadDataStore<Integer> loadDataStore =
+ LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class);
+ loadDataStore.startProducer();
+ loadDataStore.pushAsync("1", 1).get();
+ loadDataStore.removeAsync("1").get();
+
+ loadDataStore.close();
+
+ try {
+ loadDataStore.pushAsync("2", 2).get();
+ fail();
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ }
+ try {
+ loadDataStore.removeAsync("2").get();
+ fail();
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ }
+ loadDataStore.startProducer();
+ loadDataStore.pushAsync("3", 3).get();
+ loadDataStore.removeAsync("3").get();
+ }
+
}