You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/04/15 02:07:23 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-67] remove
synchronized & fix some typos (#53)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 1d02ff5 [TUBEMQ-67] remove synchronized & fix some typos (#53)
1d02ff5 is described below
commit 1d02ff5b039be403381dcf29ceab077d4106a714
Author: Tboy <gu...@immomo.com>
AuthorDate: Wed Apr 15 10:07:13 2020 +0800
[TUBEMQ-67] remove synchronized & fix some typos (#53)
---
.../tubemq/client/config/TubeClientConfig.java | 38 +++++++++++-----------
.../client/config/TubeClientConfigUtils.java | 2 +-
.../client/consumer/BaseMessageConsumer.java | 10 +++---
.../client/consumer/MessageFetchManager.java | 6 ++--
.../client/factory/TubeBaseSessionFactory.java | 2 +-
.../tubemq/client/producer/ProducerManager.java | 2 +-
.../client/producer/SimpleMessageProducer.java | 2 +-
.../factory/TubeMultiSessionFactoryTest.java | 2 +-
.../factory/TubeSingleSessionFactoryTest.java | 2 +-
.../apache/tubemq/corebase/cluster/MasterInfo.java | 2 +-
.../tubemq/corerpc/RpcServiceFailoverInvoker.java | 2 +-
11 files changed, 35 insertions(+), 35 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
index 5e9c175..3a63469 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
@@ -33,9 +33,9 @@ public class TubeClientConfig {
// Rpc read time out.
private long rpcReadTimeoutMs = RpcConstants.CFG_RPC_READ_TIMEOUT_DEFAULT_MS;
// Rpc connection processor number.
- private int rpcConnProcesserCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
+ private int rpcConnProcessorCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
// Netty memory size.
- private int rpcNnettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
+ private int rpcNettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
// The size of the thread pool, which handles the call back response.
private int rpcRspCallBackThreadCnt = RpcConstants.CFG_DEFAULT_RSP_CALLBACK_WORKER_COUNT;
// High watermark of the netty write buffer.
@@ -154,15 +154,15 @@ public class TubeClientConfig {
this.heartbeatPeriodMs = heartbeatPeriodMs;
}
- public int getRpcConnProcesserCnt() {
- return this.rpcConnProcesserCnt;
+ public int getRpcConnProcessorCnt() {
+ return this.rpcConnProcessorCnt;
}
- public void setRpcConnProcesserCnt(int rpcConnProcesserCnt) {
- if (rpcConnProcesserCnt <= 0) {
- this.rpcConnProcesserCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
+ public void setRpcConnProcessorCnt(int rpcConnProcessorCnt) {
+ if (rpcConnProcessorCnt <= 0) {
+ this.rpcConnProcessorCnt = RpcConstants.CFG_DEFAULT_CLIENT_WORKER_COUNT;
} else {
- this.rpcConnProcesserCnt = rpcConnProcesserCnt;
+ this.rpcConnProcessorCnt = rpcConnProcessorCnt;
}
}
@@ -175,19 +175,19 @@ public class TubeClientConfig {
}
public int getRpcNettyWorkMemorySize() {
- return rpcNnettyWorkMemorySize;
+ return rpcNettyWorkMemorySize;
}
/**
* Set the netty work memory size. Please notice that the value must be larger than 0.
*
- * @param rpcNnettyWorkMemorySize netty work memory size.
+ * @param rpcNettyWorkMemorySize netty work memory size.
*/
- public void setRpcNettyWorkMemorySize(int rpcNnettyWorkMemorySize) {
- if (rpcNnettyWorkMemorySize <= 0) {
- this.rpcNnettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
+ public void setRpcNettyWorkMemorySize(int rpcNettyWorkMemorySize) {
+ if (rpcNettyWorkMemorySize <= 0) {
+ this.rpcNettyWorkMemorySize = RpcConstants.CFG_DEFAULT_TOTAL_MEM_SIZE;
} else {
- this.rpcNnettyWorkMemorySize = rpcNnettyWorkMemorySize;
+ this.rpcNettyWorkMemorySize = rpcNettyWorkMemorySize;
}
}
@@ -461,10 +461,10 @@ public class TubeClientConfig {
if (rpcReadTimeoutMs != that.rpcReadTimeoutMs) {
return false;
}
- if (rpcConnProcesserCnt != that.rpcConnProcesserCnt) {
+ if (rpcConnProcessorCnt != that.rpcConnProcessorCnt) {
return false;
}
- if (rpcNnettyWorkMemorySize != that.rpcNnettyWorkMemorySize) {
+ if (rpcNettyWorkMemorySize != that.rpcNettyWorkMemorySize) {
return false;
}
if (rpcRspCallBackThreadCnt != that.rpcRspCallBackThreadCnt) {
@@ -546,15 +546,15 @@ public class TubeClientConfig {
int num = 0;
StringBuilder sBuilder = new StringBuilder(512);
sBuilder.append("{\"masterInfo\":[");
- for (String item : this.masterInfo.getAddrMap4failover().keySet()) {
+ for (String item : this.masterInfo.getAddrMap4Failover().keySet()) {
if (num++ > 0) {
sBuilder.append(",");
}
sBuilder.append("\"").append(item).append("\"");
}
return sBuilder.append("],\"rpcReadTimeoutMs\":").append(this.rpcReadTimeoutMs)
- .append(",\"rpcConnProcesserCnt\":").append(this.rpcConnProcesserCnt)
- .append(",\"rpcNnettyWorkMemorySize\":").append(this.rpcNnettyWorkMemorySize)
+ .append(",\"rpcConnProcessorCnt\":").append(this.rpcConnProcessorCnt)
+ .append(",\"rpcNettyWorkMemorySize\":").append(this.rpcNettyWorkMemorySize)
.append(",\"rpcRspCallBackThreadCnt\":").append(this.rpcRspCallBackThreadCnt)
.append(",\"nettyWriteBufferHighWaterMark\":").append(this.nettyWriteBufferHighWaterMark)
.append(",\"nettyWriteBufferLowWaterMark\":").append(this.nettyWriteBufferLowWaterMark)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
index 5e6323e..79a8548 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfigUtils.java
@@ -46,7 +46,7 @@ public class TubeClientConfigUtils {
tubeClientConfig.getNettyWriteBufferHighWaterMark());
config.put(RpcConstants.NETTY_WRITE_LOW_MARK,
tubeClientConfig.getNettyWriteBufferLowWaterMark());
- config.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcesserCnt());
+ config.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcessorCnt());
if (isSingleSession) {
config.put(RpcConstants.WORKER_THREAD_NAME, "tube_single_netty_worker-");
} else {
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 8699660..7ec0105 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -269,12 +269,12 @@ public class BaseMessageConsumer implements MessageConsumer {
new TopicProcessor(messageListener, filterConds));
if (oldProcessor != null) {
throw new TubeClientException(new StringBuilder(256).append("Topic=")
- .append(topic).append(" has been subscribered").toString());
+ .append(topic).append(" has been subscribed").toString());
}
return this;
} else {
throw new TubeClientException(new StringBuilder(256).append("Topic=")
- .append(topic).append(" has been subscribered").toString());
+ .append(topic).append(" has been subscribed").toString());
}
}
@@ -331,7 +331,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
if (!consumeSubInfo.isSubscribedTopicContain(partitionKeyItems[1].trim())) {
throw new TubeClientException(new StringBuilder(256)
- .append("Parameter error: not included in subcribed topic list: ")
+ .append("Parameter error: not included in subscribed topic list: ")
.append("partOffsetMap's key is ")
.append(entry.getKey()).append(", subscribed topics are ")
.append(consumeSubInfo.getSubscribedTopics().toString()).toString());
@@ -529,7 +529,7 @@ public class BaseMessageConsumer implements MessageConsumer {
* @return consumer id
* @throws Exception
*/
- private synchronized String generateConsumerID() throws Exception {
+ private String generateConsumerID() throws Exception {
String pidName = ManagementFactory.getRuntimeMXBean().getName();
if (pidName != null && pidName.contains("@")) {
pidName = pidName.split("@")[0];
@@ -711,7 +711,7 @@ public class BaseMessageConsumer implements MessageConsumer {
return rmtDataCache.pushSelect();
}
- protected void pushReqReleasePartiton(String partitionKey,
+ protected void pushReqReleasePartition(String partitionKey,
long usedTime,
boolean isLastPackConsumed) {
rmtDataCache.errReqRelease(partitionKey, usedTime, isLastPackConsumed);
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
index 959f7f2..6212bc3 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
@@ -186,7 +186,7 @@ public class MessageFetchManager {
if (isShutdown()) {
MessageFetchManager.this.pushConsumer
.getBaseConsumer()
- .pushReqReleasePartiton(partition.getPartitionKey(), usedToken, isLastConsumed);
+ .pushReqReleasePartition(partition.getPartitionKey(), usedToken, isLastConsumed);
partSelectResult = null;
break;
}
@@ -198,7 +198,7 @@ public class MessageFetchManager {
.getBaseConsumer().flushLastRequest(partition);
}
MessageFetchManager.this.pushConsumer
- .getBaseConsumer().pushReqReleasePartiton(partition.getPartitionKey(),
+ .getBaseConsumer().pushReqReleasePartition(partition.getPartitionKey(),
usedToken, result);
partSelectResult = null;
continue;
@@ -207,7 +207,7 @@ public class MessageFetchManager {
if (partSelectResult != null) {
MessageFetchManager.this.pushConsumer
.getBaseConsumer()
- .pushReqReleasePartiton(partSelectResult.getPartition().getPartitionKey(),
+ .pushReqReleasePartition(partSelectResult.getPartition().getPartitionKey(),
partSelectResult.getUsedToken(), false);
}
sBuilder.delete(0, sBuilder.length());
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
index 66c08e1..2469bf3 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -91,7 +91,7 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
throw new TubeClientException("null configuration");
}
MasterInfo masterInfo = tubeClientConfig.getMasterInfo();
- if ((masterInfo == null) || masterInfo.getAddrMap4failover().isEmpty()) {
+ if ((masterInfo == null) || masterInfo.getAddrMap4Failover().isEmpty()) {
throw new TubeClientException("Blank MasterInfo content in ClientConfig");
}
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index 6811aef..d3722a9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -122,7 +122,7 @@ public class ProducerManager {
tubeClientConfig.getNettyWriteBufferHighWaterMark());
rpcConfig.put(RpcConstants.NETTY_WRITE_LOW_MARK,
tubeClientConfig.getNettyWriteBufferLowWaterMark());
- rpcConfig.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcesserCnt());
+ rpcConfig.put(RpcConstants.WORKER_COUNT, tubeClientConfig.getRpcConnProcessorCnt());
rpcConfig.put(RpcConstants.WORKER_THREAD_NAME, "tube_netty_worker-");
rpcConfig.put(RpcConstants.CALLBACK_WORKER_COUNT,
tubeClientConfig.getRpcRspCallBackThreadCnt());
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
index 257e9d8..e0544ef 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/SimpleMessageProducer.java
@@ -87,7 +87,7 @@ public class SimpleMessageProducer implements MessageProducer {
this.rpcConfig.put(RpcConstants.NETTY_WRITE_LOW_MARK,
tubeClientConfig.getNettyWriteBufferLowWaterMark());
this.rpcConfig.put(RpcConstants.WORKER_COUNT,
- tubeClientConfig.getRpcConnProcesserCnt());
+ tubeClientConfig.getRpcConnProcessorCnt());
this.rpcConfig.put(RpcConstants.WORKER_THREAD_NAME,
"tube_producer_netty_worker-");
this.rpcConfig.put(RpcConstants.WORKER_MEM_SIZE,
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
index 5c0b893..32e1b21 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeMultiSessionFactoryTest.java
@@ -35,7 +35,7 @@ public class TubeMultiSessionFactoryTest {
@Test
public void testTubeMultiSessionFactory() throws Exception {
TubeClientConfig config = mock(TubeClientConfig.class);
- when(config.getRpcConnProcesserCnt()).thenReturn(1);
+ when(config.getRpcConnProcessorCnt()).thenReturn(1);
when(config.getRpcRspCallBackThreadCnt()).thenReturn(1);
when(config.getMasterInfo()).thenReturn(new MasterInfo("192.168.1.1:18080"));
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
index bbb29ea..66daa28 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/factory/TubeSingleSessionFactoryTest.java
@@ -34,7 +34,7 @@ public class TubeSingleSessionFactoryTest {
@Test
public void testTubeSingleSessionFactory() throws Exception {
TubeClientConfig config = mock(TubeClientConfig.class);
- when(config.getRpcConnProcesserCnt()).thenReturn(1);
+ when(config.getRpcConnProcessorCnt()).thenReturn(1);
when(config.getRpcRspCallBackThreadCnt()).thenReturn(1);
when(config.getMasterInfo()).thenReturn(new MasterInfo("192.168.1.1:18080"));
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
index b82cb05..2e1e211 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/cluster/MasterInfo.java
@@ -106,7 +106,7 @@ public class MasterInfo {
this.masterClusterStr = masterClusterStr;
}
- public Map<String, NodeAddrInfo> getAddrMap4failover() {
+ public Map<String, NodeAddrInfo> getAddrMap4Failover() {
return addrMap4Failover;
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
index 9759528..8c90bb8 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFailoverInvoker.java
@@ -131,7 +131,7 @@ public class RpcServiceFailoverInvoker extends AbstractServiceInvoker {
while (client == null || !client.isReady()) {
String nodeKey =
addressList.get((retryCounter.getAndIncrement() & Integer.MAX_VALUE) % masterNodeCnt);
- NodeAddrInfo nodeAddrInfo = masterInfo.getAddrMap4failover().get(nodeKey);
+ NodeAddrInfo nodeAddrInfo = masterInfo.getAddrMap4Failover().get(nodeKey);
try {
client = clientFactory.getClient(nodeAddrInfo, conf);
} catch (Throwable e) {