You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/04/30 09:07:06 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 7866569  [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67)
7866569 is described below

commit 786656958628b27f48c20745eb9697c329ea72dc
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Apr 30 09:07:00 2020 +0000

    [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../client/factory/TubeMultiSessionFactory.java    | 15 ++++++++++++-
 .../client/factory/TubeSingleSessionFactory.java   | 26 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
index 3dbb561..1cf91be 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java
@@ -34,12 +34,13 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
 
     private final NettyClientFactory clientFactory = new NettyClientFactory();
     private final TubeBaseSessionFactory baseSessionFactory;
-    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
+    private final AtomicBoolean isShutDown = new AtomicBoolean(true);
 
     public TubeMultiSessionFactory(final TubeClientConfig tubeClientConfig) throws TubeClientException {
         RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, false);
         clientFactory.configure(config);
         baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig);
+        isShutDown.set(false);
     }
 
     @Override
@@ -52,23 +53,35 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
 
     @Override
     public <T extends Shutdownable> void removeClient(final T client) {
+        if (baseSessionFactory == null) {
+            return;
+        }
         this.baseSessionFactory.removeClient(client);
     }
 
     @Override
     public MessageProducer createProducer() throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return this.baseSessionFactory.createProducer();
     }
 
     @Override
     public PushMessageConsumer createPushConsumer(final ConsumerConfig consumerConfig)
             throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return this.baseSessionFactory.createPushConsumer(consumerConfig);
     }
 
     @Override
     public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
             throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return this.baseSessionFactory.createPullConsumer(consumerConfig);
     }
 
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
index a574bf4..1532645 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.tubemq.client.factory;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.client.config.ConsumerConfig;
 import org.apache.tubemq.client.config.TubeClientConfig;
@@ -33,6 +34,7 @@ import org.apache.tubemq.corerpc.netty.NettyClientFactory;
 public class TubeSingleSessionFactory implements MessageSessionFactory {
 
     private static final NettyClientFactory clientFactory = new NettyClientFactory();
+    private static final AtomicBoolean isShutDown = new AtomicBoolean(true);
     private static final AtomicLong referenceCounter = new AtomicLong(0);
     private static TubeBaseSessionFactory baseSessionFactory;
 
@@ -42,37 +44,61 @@ public class TubeSingleSessionFactory implements MessageSessionFactory {
             RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, true);
             clientFactory.configure(config);
             baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig);
+            isShutDown.set(false);
+        }
+        while (isShutDown.get()) {
+            try {
+                Thread.sleep(50);
+            } catch (Throwable e) {
+                break;
+            }
         }
     }
 
     @Override
     public void shutdown() throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         if (referenceCounter.decrementAndGet() > 0) {
             return;
         }
         baseSessionFactory.shutdown();
         clientFactory.shutdown();
+        isShutDown.set(true);
     }
 
     @Override
     public <T extends Shutdownable> void removeClient(final T client) {
+        if (baseSessionFactory == null) {
+            return;
+        }
         baseSessionFactory.removeClient(client);
     }
 
     @Override
     public MessageProducer createProducer() throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return baseSessionFactory.createProducer();
     }
 
     @Override
     public PushMessageConsumer createPushConsumer(ConsumerConfig consumerConfig)
             throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return baseSessionFactory.createPushConsumer(consumerConfig);
     }
 
     @Override
     public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
             throws TubeClientException {
+        if (isShutDown.get()) {
+            throw new TubeClientException("Please initialize the object first!");
+        }
         return baseSessionFactory.createPullConsumer(consumerConfig);
     }