You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/04/30 09:07:06 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-85] There is NPE
when creating PullConsumer with TubeSingleSessionFactory (#67)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 7866569 [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67)
7866569 is described below
commit 786656958628b27f48c20745eb9697c329ea72dc
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Apr 30 09:07:00 2020 +0000
[TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../client/factory/TubeMultiSessionFactory.java | 15 ++++++++++++-
.../client/factory/TubeSingleSessionFactory.java | 26 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
index 3dbb561..1cf91be 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
@@ -34,12 +34,13 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
private final NettyClientFactory clientFactory = new NettyClientFactory();
private final TubeBaseSessionFactory baseSessionFactory;
- private final AtomicBoolean isShutDown = new AtomicBoolean(false);
+ private final AtomicBoolean isShutDown = new AtomicBoolean(true);
public TubeMultiSessionFactory(final TubeClientConfig tubeClientConfig) throws TubeClientException {
RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, false);
clientFactory.configure(config);
baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig);
+ isShutDown.set(false);
}
@Override
@@ -52,23 +53,35 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
@Override
public <T extends Shutdownable> void removeClient(final T client) {
+ if (baseSessionFactory == null) {
+ return;
+ }
this.baseSessionFactory.removeClient(client);
}
@Override
public MessageProducer createProducer() throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return this.baseSessionFactory.createProducer();
}
@Override
public PushMessageConsumer createPushConsumer(final ConsumerConfig consumerConfig)
throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return this.baseSessionFactory.createPushConsumer(consumerConfig);
}
@Override
public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return this.baseSessionFactory.createPullConsumer(consumerConfig);
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
index a574bf4..1532645 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
@@ -17,6 +17,7 @@
package org.apache.tubemq.client.factory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.config.TubeClientConfig;
@@ -33,6 +34,7 @@ import org.apache.tubemq.corerpc.netty.NettyClientFactory;
public class TubeSingleSessionFactory implements MessageSessionFactory {
private static final NettyClientFactory clientFactory = new NettyClientFactory();
+ private static final AtomicBoolean isShutDown = new AtomicBoolean(true);
private static final AtomicLong referenceCounter = new AtomicLong(0);
private static TubeBaseSessionFactory baseSessionFactory;
@@ -42,37 +44,61 @@ public class TubeSingleSessionFactory implements MessageSessionFactory {
RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, true);
clientFactory.configure(config);
baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig);
+ isShutDown.set(false);
+ }
+ while (isShutDown.get()) {
+ try {
+ Thread.sleep(50);
+ } catch (Throwable e) {
+ break;
+ }
}
}
@Override
public void shutdown() throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
if (referenceCounter.decrementAndGet() > 0) {
return;
}
baseSessionFactory.shutdown();
clientFactory.shutdown();
+ isShutDown.set(true);
}
@Override
public <T extends Shutdownable> void removeClient(final T client) {
+ if (baseSessionFactory == null) {
+ return;
+ }
baseSessionFactory.removeClient(client);
}
@Override
public MessageProducer createProducer() throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return baseSessionFactory.createProducer();
}
@Override
public PushMessageConsumer createPushConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return baseSessionFactory.createPushConsumer(consumerConfig);
}
@Override
public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
return baseSessionFactory.createPullConsumer(consumerConfig);
}