You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/04/15 02:07:23 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-67] remove synchronized & fix some typos (#53)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d02ff5  [TUBEMQ-67] remove synchronized & fix some typos (#53)
1d02ff5 is described below

commit 1d02ff5b039be403381dcf29ceab077d4106a714
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Apr 15 10:07:13 2020 +0800

    [TUBEMQ-67] remove synchronized & fix some typos (#53)
---
 .../tubemq/client/config/TubeClientConfig.java     | 38 +++++++++++-----------
 .../client/config/TubeClientConfigUtils.java       |  2 +-
 .../client/consumer/BaseMessageConsumer.java       | 10 +++---
 .../client/consumer/MessageFetchManager.java       |  6 ++--
 .../client/factory/TubeBaseSessionFactory.java     |  2 +-
 .../tubemq/client/producer/ProducerManager.java    |  2 +-
 .../client/producer/SimpleMessageProducer.java     |  2 +-
 .../factory/TubeMultiSessionFactoryTest.java       |  2 +-
 .../factory/TubeSingleSessionFactoryTest.java      |  2 +-
 .../apache/tubemq/corebase/cluster/MasterInfo.java |  2 +-
 .../tubemq/corerpc/RpcServiceFailoverInvoker.java  |  2 +-
 11 files changed, 35 insertions(+), 35 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
index 5e9c175..3a63469 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
@@ -33,9 +33,9 @@ public class TubeClientConfig {
     // Rpc read time out.
     private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
     // Rpc connection processor number.
-    private int rpcConnProcesserCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
+    private int rpcConnProcessorCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
     // Netty memory size.
-    private int rpcNnettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
+    private int rpcNettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
     // The size of the thread pool, which handles the call back response.
     private int rpcRspCallBackThreadCnt = RpcConstants.CFG_DEFAULT_RSP_CALLBACK_WORKER_COUNT;
     // High watermark of the netty write buffer.
@@ -154,15 +154,15 @@ public class TubeClientConfig {
         this.heartbeatPeriodMs = heartbeatPeriodMs;
     }
 
-    public int getRpcConnProcesserCnt() {
-        return this.rpcConnProcesserCnt;
+    public int getRpcConnProcessorCnt() {
+        return this.rpcConnProcessorCnt;
     }
 
-    public void setRpcConnProcesserCnt(int rpcConnProcesserCnt) {
-        if (rpcConnProcesserCnt <= 0) {
-            this.rpcConnProcesserCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
+    public void setRpcConnProcessorCnt(int rpcConnProcessorCnt) {
+        if (rpcConnProcessorCnt <= 0) {
+            this.rpcConnProcessorCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
         } else {
-            this.rpcConnProcesserCnt = rpcConnProcesserCnt;
+            this.rpcConnProcessorCnt = rpcConnProcessorCnt;
         }
     }
 
@@ -175,19 +175,19 @@ public class TubeClientConfig {
     }
 
     public int getRpcNettyWorkMemorySize() {
-        return rpcNnettyWorkMemorySize;
+        return rpcNettyWorkMemorySize;
     }
 
     /**
      * Set the netty work memory size. Please notice that the value must be larger than 0.
      *
-     * @param rpcNnettyWorkMemorySize netty work memory size.
+     * @param rpcNettyWorkMemorySize netty work memory size.
      */
-    public void setRpcNettyWorkMemorySize(int rpcNnettyWorkMemorySize) {
-        if (rpcNnettyWorkMemorySize <= 0) {
-            this.rpcNnettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
+    public void setRpcNettyWorkMemorySize(int rpcNettyWorkMemorySize) {
+        if (rpcNettyWorkMemorySize <= 0) {
+            this.rpcNettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
         } else {
-            this.rpcNnettyWorkMemorySize = rpcNnettyWorkMemorySize;
+            this.rpcNettyWorkMemorySize = rpcNettyWorkMemorySize;
         }
     }
 
@@ -461,10 +461,10 @@ public class TubeClientConfig {
         if (rpcReadTimeoutMs != that.rpcReadTimeoutMs) {
             return false;
         }
-        if (rpcConnProcesserCnt != that.rpcConnProcesserCnt) {
+        if (rpcConnProcessorCnt != that.rpcConnProcessorCnt) {
             return false;
         }
-        if (rpcNnettyWorkMemorySize != that.rpcNnettyWorkMemorySize) {
+        if (rpcNettyWorkMemorySize != that.rpcNettyWorkMemorySize) {
             return false;
         }
         if (rpcRspCallBackThreadCnt != that.rpcRspCallBackThreadCnt) {
@@ -546,15 +546,15 @@ public class TubeClientConfig {
         int num = 0;
         StringBuilder sBuilder = new StringBuilder(512);
         sBuilder.append("{\"masterInfo\":[");
-        for (String item : this.masterInfo.getAddrMap4failover().keySet()) {
+        for (String item : this.masterInfo.getAddrMap4Failover().keySet()) {
             if (num++ > 0) {
                 sBuilder.append(",");
             }
             sBuilder.append("\"").append(item).append("\"");
         }
         return sBuilder.append("],\"rpcReadTimeoutMs\":").append(this.rpcReadTimeoutMs)
-            .append(",\"rpcConnProcesserCnt\":").append(this.rpcConnProcesserCnt)
-            .append(",\"rpcNnettyWorkMemorySize\":").append(this.rpcNnettyWorkMemorySize)
+            .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
+            .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
             .append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
             .append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
             .append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
index 5e6323e..79a8548 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
@@ -46,7 +46,7 @@ public class TubeClientConfigUtils {
                 tubeClientConfig.getNettyWriteBufferHighWaterMark());
         config.put(RpcConstants.NETTY_WRITE_LOW_MARK,
                 tubeClientConfig.getNettyWriteBufferLowWaterMark());
-        config.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcesserCnt());
+        config.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcessorCnt());
         if (isSingleSession) {
             config.put(RpcConstants.WORKER_THREAD_NAME, "tube_single_netty_worker-");
         } else {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 8699660..7ec0105 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -269,12 +269,12 @@ public class BaseMessageConsumer implements MessageConsumer {
                             new TopicProcessor(messageListener, filterConds));
             if (oldProcessor != null) {
                 throw new TubeClientException(new StringBuilder(256).append("Topic=")
-                        .append(topic).append(" has been subscribered").toString());
+                        .append(topic).append(" has been subscribed").toString());
             }
             return this;
         } else {
             throw new TubeClientException(new StringBuilder(256).append("Topic=")
-                    .append(topic).append(" has been subscribered").toString());
+                    .append(topic).append(" has been subscribed").toString());
         }
     }
 
@@ -331,7 +331,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                     }
                     if (!consumeSubInfo.isSubscribedTopicContain(partitionKeyItems[1].trim())) {
                         throw new TubeClientException(new StringBuilder(256)
-                                .append("Parameter error: not included in subcribed topic list: ")
+                                .append("Parameter error: not included in subscribed topic list: ")
                                 .append("partOffsetMap's key is ")
                                 .append(entry.getKey()).append(", subscribed topics are ")
                                 .append(consumeSubInfo.getSubscribedTopics().toString()).toString());
@@ -529,7 +529,7 @@ public class BaseMessageConsumer implements MessageConsumer {
      * @return consumer id
      * @throws Exception
      */
-    private synchronized String generateConsumerID() throws Exception {
+    private String generateConsumerID() throws Exception {
         String pidName = ManagementFactory.getRuntimeMXBean().getName();
         if (pidName != null && pidName.contains("@")) {
             pidName = pidName.split("@")[0];
@@ -711,7 +711,7 @@ public class BaseMessageConsumer implements MessageConsumer {
         return rmtDataCache.pushSelect();
     }
 
-    protected void pushReqReleasePartiton(String partitionKey,
+    protected void pushReqReleasePartition(String partitionKey,
                                           long usedTime,
                                           boolean isLastPackConsumed) {
         rmtDataCache.errReqRelease(partitionKey, usedTime, isLastPackConsumed);
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
index 959f7f2..6212bc3 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
@@ -186,7 +186,7 @@ public class MessageFetchManager {
                     if (isShutdown()) {
                         MessageFetchManager.this.pushConsumer
                                 .getBaseConsumer()
-                                .pushReqReleasePartiton(partition.getPartitionKey(), usedToken, isLastConsumed);
+                                .pushReqReleasePartition(partition.getPartitionKey(), usedToken, isLastConsumed);
                         partSelectResult = null;
                         break;
                     }
@@ -198,7 +198,7 @@ public class MessageFetchManager {
                                             .getBaseConsumer().flushLastRequest(partition);
                         }
                         MessageFetchManager.this.pushConsumer
-                                .getBaseConsumer().pushReqReleasePartiton(partition.getPartitionKey(),
+                                .getBaseConsumer().pushReqReleasePartition(partition.getPartitionKey(),
                                 usedToken, result);
                         partSelectResult = null;
                         continue;
@@ -207,7 +207,7 @@ public class MessageFetchManager {
                     if (partSelectResult != null) {
                         MessageFetchManager.this.pushConsumer
                                 .getBaseConsumer()
-                                .pushReqReleasePartiton(partSelectResult.getPartition().getPartitionKey(),
+                                .pushReqReleasePartition(partSelectResult.getPartition().getPartitionKey(),
                                         partSelectResult.getUsedToken(), false);
                     }
                     sBuilder.delete(0, sBuilder.length());
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
index 66c08e1..2469bf3 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -91,7 +91,7 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
             throw new TubeClientException("null configuration");
         }
         MasterInfo masterInfo = tubeClientConfig.getMasterInfo();
-        if ((masterInfo == null) || masterInfo.getAddrMap4failover().isEmpty()) {
+        if ((masterInfo == null) || masterInfo.getAddrMap4Failover().isEmpty()) {
             throw new TubeClientException("Blank MasterInfo content in ClientConfig");
         }
     }
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index 6811aef..d3722a9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -122,7 +122,7 @@ public class ProducerManager {
                 tubeClientConfig.getNettyWriteBufferHighWaterMark());
         rpcConfig.put(RpcConstants.NETTY_WRITE_LOW_MARK,
                 tubeClientConfig.getNettyWriteBufferLowWaterMark());
-        rpcConfig.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcesserCnt());
+        rpcConfig.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcessorCnt());
         rpcConfig.put(RpcConstants.WORKER_THREAD_NAME, "tube_netty_worker-");
         rpcConfig.put(RpcConstants.CALLBACK_WORKER_COUNT,
                 tubeClientConfig.getRpcRspCallBackThreadCnt());
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
index 257e9d8..e0544ef 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
@@ -87,7 +87,7 @@ public class SimpleMessageProducer implements MessageProducer {
         this.rpcConfig.put(RpcConstants.NETTY_WRITE_LOW_MARK,
             tubeClientConfig.getNettyWriteBufferLowWaterMark());
         this.rpcConfig.put(RpcConstants.WORKER_COUNT,
-            tubeClientConfig.getRpcConnProcesserCnt());
+            tubeClientConfig.getRpcConnProcessorCnt());
         this.rpcConfig.put(RpcConstants.WORKER_THREAD_NAME,
             "tube_producer_netty_worker-");
         this.rpcConfig.put(RpcConstants.WORKER_MEM_SIZE,
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
index 5c0b893..32e1b21 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
@@ -35,7 +35,7 @@ public class TubeMultiSessionFactoryTest {
     @Test
     public void testTubeMultiSessionFactory() throws Exception {
         TubeClientConfig config = mock(TubeClientConfig.class);
-        when(config.getRpcConnProcesserCnt()).thenReturn(1);
+        when(config.getRpcConnProcessorCnt()).thenReturn(1);
         when(config.getRpcRspCallBackThreadCnt()).thenReturn(1);
         when(config.getMasterInfo()).thenReturn(new MasterInfo("192.168.1.1:18080"));
 
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
index bbb29ea..66daa28 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
@@ -34,7 +34,7 @@ public class TubeSingleSessionFactoryTest {
     @Test
     public void testTubeSingleSessionFactory() throws Exception {
         TubeClientConfig config = mock(TubeClientConfig.class);
-        when(config.getRpcConnProcesserCnt()).thenReturn(1);
+        when(config.getRpcConnProcessorCnt()).thenReturn(1);
         when(config.getRpcRspCallBackThreadCnt()).thenReturn(1);
         when(config.getMasterInfo()).thenReturn(new MasterInfo("192.168.1.1:18080"));
 
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
index b82cb05..2e1e211 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
@@ -106,7 +106,7 @@ public class MasterInfo {
         this.masterClusterStr = masterClusterStr;
     }
 
-    public Map<String, NodeAddrInfo> getAddrMap4failover() {
+    public Map<String, NodeAddrInfo> getAddrMap4Failover() {
         return addrMap4Failover;
     }
 
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
index 9759528..8c90bb8 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
@@ -131,7 +131,7 @@ public class RpcServiceFailoverInvoker extends AbstractServiceInvoker {
             while (client == null || !client.isReady()) {
                 String nodeKey =
                         addressList.get((retryCounter.getAndIncrement() & Integer.MAX_VALUE) % masterNodeCnt);
-                NodeAddrInfo nodeAddrInfo = masterInfo.getAddrMap4failover().get(nodeKey);
+                NodeAddrInfo nodeAddrInfo = masterInfo.getAddrMap4Failover().get(nodeKey);
                 try {
                     client = clientFactory.getClient(nodeAddrInfo, conf);
                 } catch (Throwable e) {