You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/07 11:59:58 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-495]Code
implementation adjustment based on SpotBugs check
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new aed064e [TUBEMQ-495]Code implementation adjustment based on SpotBugs check
aed064e is described below
commit aed064ea3fcefe7b167796c651c5ab2eae2e593a
Author: gosonzhang <go...@tencent.com>
AuthorDate: Thu Jan 7 11:02:29 2021 +0800
[TUBEMQ-495]Code implementation adjustment based on SpotBugs check
---
.../tubemq/client/config/TubeClientConfig.java | 2 +-
.../client/consumer/MessageFetchManager.java | 5 +++-
.../tubemq/client/consumer/RmtDataCache.java | 4 +--
.../apache/tubemq/corebase/utils/AddressUtils.java | 12 ++++----
.../org/apache/tubemq/corebase/utils/Tuple2.java | 12 ++++++--
.../org/apache/tubemq/corebase/utils/Tuple3.java | 18 ++++++++++--
.../tubemq/corerpc/AbstractServiceInvoker.java | 2 +-
.../apache/tubemq/corerpc/RpcServiceFactory.java | 4 +--
.../tubemq/example/MAMessageProducerExample.java | 6 ++--
.../tubemq/example/MessageProducerExample.java | 6 ++--
.../server/broker/offset/DefaultOffsetManager.java | 24 ++++++++--------
.../server/broker/stats/GroupCountService.java | 2 +-
.../server/broker/utils/GroupOffsetInfo.java | 4 +--
.../server/broker/web/BrokerAdminServlet.java | 15 ++++------
.../tubemq/server/common/webbase/WebFieldType.java | 4 +--
.../org/apache/tubemq/server/master/TMaster.java | 32 +++++++++++-----------
.../master/bdbstore/DefaultBdbStoreService.java | 4 +--
.../nodemanage/nodebroker/BrokerInfoHolder.java | 4 +--
.../tubemq/server/tools/cli/CliProducer.java | 6 ++--
19 files changed, 91 insertions(+), 75 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
index b1bd3b1..437b171 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java
@@ -101,7 +101,7 @@ public class TubeClientConfig {
throw new IllegalArgumentException("Illegal parameter: masterAddrInfo is null!");
}
this.masterInfo = masterInfo.clone();
- String iPv4LocalAddress = AddressUtils.getIPV4LocalAddress();
+ AddressUtils.getIPV4LocalAddress();
}
@Deprecated
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
index 6afb8a9..80279ff 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java
@@ -216,7 +216,10 @@ public class MessageFetchManager {
sBuilder.delete(0, sBuilder.length());
}
fetchWorkerStatusMap.put(curThreadId, 2);
- MessageFetchManager.this.pushConsumer.processRequest(partSelectResult, sBuilder);
+ if (partSelectResult != null) {
+ MessageFetchManager.this.pushConsumer.processRequest(
+ partSelectResult, sBuilder);
+ }
}
fetchWorkerStatusMap.remove(curThreadId);
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 745f127..a222913 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -423,8 +423,8 @@ public class RmtDataCache implements Closeable {
if (frozenTime == null) {
if (waitDlt > 10) {
TimeoutTask timeoutTask = new TimeoutTask(partitionKey);
- timeouts.put(partitionKey,
- timer.newTimeout(timeoutTask, waitDlt, TimeUnit.MILLISECONDS));
+ timeouts.put(partitionKey, timer.newTimeout(
+ timeoutTask, waitDlt, TimeUnit.MILLISECONDS));
} else {
releaseIdlePartition(partitionKey);
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java
index 5a76af2..bc14e5f 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java
@@ -58,12 +58,12 @@ public class AddressUtils {
try {
Tuple2<Boolean, String> result =
getValidIPV4Address(allInterface.nextElement(), currLocalHost);
- if (result.f0) {
+ if (result.getF0()) {
localIPAddress = currLocalHost;
return true;
}
if (TStringUtils.isEmpty(fstV4IP)) {
- fstV4IP = result.f1;
+ fstV4IP = result.getF1();
}
} catch (Throwable e) {
//
@@ -153,8 +153,8 @@ public class AddressUtils {
try {
Tuple2<Boolean, String> result =
getValidIPV4Address(enumeration.nextElement(), null);
- if (result.f0) {
- tmpAdress = result.f1;
+ if (result.getF0()) {
+ tmpAdress = result.getF1();
break;
}
} catch (Throwable e) {
@@ -196,8 +196,8 @@ public class AddressUtils {
try {
Tuple2<Boolean, String> result =
getValidIPV4Address(oneInterface, null);
- if (result.f0) {
- localIPAddress = result.f1;
+ if (result.getF0()) {
+ localIPAddress = result.getF1();
return localIPAddress;
}
} catch (Throwable e) {
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
index f5626f8..048452f 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java
@@ -20,9 +20,9 @@ package org.apache.tubemq.corebase.utils;
public class Tuple2<T0, T1> {
/** Field 0 of the tuple. */
- public T0 f0 = null;
+ private T0 f0 = null;
/** Field 1 of the tuple. */
- public T1 f1 = null;
+ private T1 f1 = null;
/**
* Creates a new tuple where all fields are null.
@@ -50,4 +50,12 @@ public class Tuple2<T0, T1> {
this.f0 = value0;
this.f1 = value1;
}
+
+ public T0 getF0() {
+ return f0;
+ }
+
+ public T1 getF1() {
+ return f1;
+ }
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
index a2d98c3..579b425 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java
@@ -20,11 +20,11 @@ package org.apache.tubemq.corebase.utils;
public class Tuple3<T0, T1, T2> {
/** Field 0 of the tuple. */
- public T0 f0 = null;
+ private T0 f0 = null;
/** Field 1 of the tuple. */
- public T1 f1 = null;
+ private T1 f1 = null;
/** Field 2 of the tuple. */
- public T2 f2 = null;
+ private T2 f2 = null;
/**
* Creates a new tuple where all fields are null.
@@ -45,4 +45,16 @@ public class Tuple3<T0, T1, T2> {
this.f1 = value1;
this.f2 = value2;
}
+
+ public T0 getF0() {
+ return f0;
+ }
+
+ public T1 getF1() {
+ return f1;
+ }
+
+ public T2 getF2() {
+ return f2;
+ }
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java
index 8103387..1154f12 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java
@@ -64,7 +64,7 @@ public abstract class AbstractServiceInvoker implements InvocationHandler {
// client.close();
}
- private class RpcResponseCallback implements Callback {
+ private static class RpcResponseCallback implements Callback {
private Callback chainedCallback;
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
index 8652c2e..cd16253 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java
@@ -470,7 +470,7 @@ public class RpcServiceFactory {
.append(masterInfo.getMasterClusterStr()).toString();
}
- private class ServiceHolder<T> implements Shutdownable {
+ private static class ServiceHolder<T> implements Shutdownable {
private T service;
private AbstractServiceInvoker invoker;
@@ -489,7 +489,7 @@ public class RpcServiceFactory {
}
}
- private class ConnectionNode {
+ private static class ConnectionNode {
private Class clazzType;
private NodeAddrInfo addressInfo;
private RpcConfig config;
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
index ea546c9..814dbad 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java
@@ -191,12 +191,12 @@ public class MAMessageProducerExample {
long millis = System.currentTimeMillis();
roundIndex = (int) (sentCount++ % targetCnt);
Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.f0, sendData);
+ Message message = new Message(target.getF0(), sendData);
message.setAttrKeyVal("index", String.valueOf(sentCount));
message.setAttrKeyVal("dataTime", String.valueOf(millis));
- if (target.f1 != null) {
+ if (target.getF1() != null) {
filterMsgCount.incrementAndGet();
- message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
+ message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
}
try {
// next line sends message synchronously, which is not recommended
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
index 7c29a63..2b806b5 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java
@@ -104,12 +104,12 @@ public final class MessageProducerExample {
while (msgCount < 0 || sentCount < msgCount) {
roundIndex = (int) (sentCount++ % targetCnt);
Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.f0, body.getBytes());
+ Message message = new Message(target.getF0(), body.getBytes());
long currTimeMillis = System.currentTimeMillis();
message.setAttrKeyVal("index", String.valueOf(sentCount));
message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis));
- if (target.f1 != null) {
- message.putSystemHeader(target.f1, sdf.format(new Date(currTimeMillis)));
+ if (target.getF1() != null) {
+ message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis)));
}
try {
// 1.1 next line sends message synchronously, which is not recommended
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index f052375..0e91dc2 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -439,10 +439,12 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
getOffsetCacheKey(entry.getKey(), partitionId);
OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey);
Long tmpOffset = tmpPartOffsetMap.get(offsetCacheKey);
+ if (tmpOffset == null) {
+ tmpOffset = 0L;
+ }
if (offsetInfo != null) {
offsetMap.put(partitionId,
- new Tuple2<>(offsetInfo.getOffset(),
- (tmpOffset == null ? 0 : tmpOffset)));
+ new Tuple2<>(offsetInfo.getOffset(), tmpOffset));
}
}
if (!offsetMap.isEmpty()) {
@@ -473,21 +475,21 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
for (String group : groups) {
for (Tuple3<String, Integer, Long> tuple3 : topicPartOffsets) {
if (tuple3 == null
- || tuple3.f0 == null
- || tuple3.f1 == null
- || tuple3.f2 == null) {
+ || tuple3.getF0() == null
+ || tuple3.getF1() == null
+ || tuple3.getF2() == null) {
continue;
}
// set offset value
- offsetCacheKey = getOffsetCacheKey(tuple3.f0, tuple3.f1);
+ offsetCacheKey = getOffsetCacheKey(tuple3.getF0(), tuple3.getF1());
getAndResetTmpOffset(group, offsetCacheKey);
OffsetStorageInfo regInfo = loadOrCreateOffset(group,
- tuple3.f0, tuple3.f1, offsetCacheKey, 0);
- oldOffset = regInfo.getAndSetOffset(tuple3.f2);
+ tuple3.getF0(), tuple3.getF1(), offsetCacheKey, 0);
+ oldOffset = regInfo.getAndSetOffset(tuple3.getF2());
changed = true;
logger.info(strBuidler
.append("[Offset Manager] Update offset by modifier=")
- .append(modifier).append(",reset offset=").append(tuple3.f2)
+ .append(modifier).append(",reset offset=").append(tuple3.getF2())
.append(",old offset=").append(oldOffset)
.append(",updated offset=").append(regInfo.getOffset())
.append(",group=").append(group)
@@ -710,9 +712,5 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
.append("-").append(partitionId).toString();
}
- private String getOffsetCacheKey(String topic, String partitionId) {
- return new StringBuilder(256).append(topic)
- .append("-").append(partitionId).toString();
- }
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
index b0de32e..7a7c056 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java
@@ -139,7 +139,7 @@ public class GroupCountService extends AbstractDaemonService implements CountSer
countSet.refCnt.decrementAndGet();
}
- private class CountSet {
+ private static class CountSet {
public AtomicLong refCnt = new AtomicLong(0);
public ConcurrentHashMap<String, CountItem> counterItem =
new ConcurrentHashMap<>();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
index a0c7215..fc0ebfa 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java
@@ -48,8 +48,8 @@ public class GroupOffsetInfo {
public void setConsumeOffsetInfo(Tuple2<Long, Long> offsetInfo) {
if (offsetInfo != null) {
- this.curOffset = offsetInfo.f0;
- this.flightOffset = offsetInfo.f1;
+ this.curOffset = offsetInfo.getF0();
+ this.flightOffset = offsetInfo.getF1();
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index 30d7247..5b11be1 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -459,7 +459,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
return;
}
Set<String> filterCondStrSet = (Set<String>) result.retData1;
- sBuilder = broker.getBrokerServiceServer()
+ broker.getBrokerServiceServer()
.getMessageSnapshot(topicName, partitionId, msgCount, filterCondStrSet, sBuilder);
}
@@ -811,8 +811,7 @@ public class BrokerAdminServlet extends AbstractWebHandler {
// transfer offset format
resetOffsets = buildOffsetResetInfo(topicSet);
}
- boolean changed = broker.getOffsetManager().modifyGroupOffset(
- groupNameSet, resetOffsets, modifier);
+ broker.getOffsetManager().modifyGroupOffset(groupNameSet, resetOffsets, modifier);
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
@@ -875,10 +874,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets =
broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap);
// transfer offset format
- List<Tuple3<String, Integer, Long>> resetOffsets =
- buildOffsetResetInfo(srcGroupOffsets);
- boolean changed = broker.getOffsetManager().modifyGroupOffset(
- tgtGroupNameSet, resetOffsets, modifier);
+ List<Tuple3<String, Integer, Long>> resetOffsets = buildOffsetResetInfo(srcGroupOffsets);
+ broker.getOffsetManager().modifyGroupOffset(tgtGroupNameSet, resetOffsets, modifier);
// builder return result
sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
}
@@ -965,8 +962,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
long firstOffset = store.getIndexMinOffset();
long lastOffset = store.getIndexMaxOffset();
// adjust reset offset value
- adjOffset = offsetTuple.f0 < firstOffset
- ? firstOffset : Math.min(offsetTuple.f0, lastOffset);
+ adjOffset = offsetTuple.getF0() < firstOffset
+ ? firstOffset : Math.min(offsetTuple.getF0(), lastOffset);
result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset));
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
index 2f32cb1..a3e037f 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java
@@ -32,8 +32,8 @@ public enum WebFieldType {
JSONTYPE(8, "Json");
- public int value;
- public String desc;
+ private int value;
+ private String desc;
WebFieldType(int value, String desc) {
this.value = value;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index cc6f681..4299bc7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -1956,12 +1956,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
- || tupleInfo.f0 == null
- || tupleInfo.f1 == null) {
+ || tupleInfo.getF0() == null
+ || tupleInfo.getF1() == null) {
continue;
}
List<String> blackTopicList =
- this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
+ this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.getF0());
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1979,7 +1979,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
currentPartMap = new HashMap<>();
}
}
- if (tupleInfo.f1.isOverTLS()) {
+ if (tupleInfo.getF1().isOverTLS()) {
for (Partition currentPart : currentPartMap.values()) {
if (!blackTopicList.contains(currentPart.getTopic())) {
boolean found = false;
@@ -1995,8 +1995,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.f0,
- tupleInfo.f1.isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
+ tupleInfo.getF1().isOverTLS(), currentPart));
}
for (Partition finalPart : finalPartList) {
if (!blackTopicList.contains(finalPart.getTopic())) {
@@ -2012,7 +2012,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
continue;
}
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.f0, true, finalPart));
+ tupleInfo.getF0(), true, finalPart));
}
}
} else {
@@ -2020,14 +2020,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (!finalPartList.contains(currentPart))) {
deletedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.f0, false, currentPart));
+ tupleInfo.getF0(), false, currentPart));
}
}
for (Partition finalPart : finalPartList) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.f0, false, finalPart));
+ tupleInfo.getF0(), false, finalPart));
}
}
}
@@ -2090,13 +2090,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
- || tupleInfo.f0 == null
- || tupleInfo.f1 == null) {
+ || tupleInfo.getF0() == null
+ || tupleInfo.getF1() == null) {
continue;
}
// allocate partitions to consumers
List<String> blackTopicList =
- this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
+ this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.getF0());
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -2120,15 +2120,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.f0,
- tupleInfo.f1.isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
+ tupleInfo.getF1().isOverTLS(), currentPart));
}
}
for (Partition finalPart : finalPartMap.values()) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
- addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0,
- tupleInfo.f1.isOverTLS(), finalPart));
+ addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.getF0(),
+ tupleInfo.getF1().isOverTLS(), finalPart));
}
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
index 7622c57..b201cde 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java
@@ -930,9 +930,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server {
if (!isPrimaryNodeActive()) {
if ((replicas4Transfer != null) && (!replicas4Transfer.isEmpty())) {
logger.info("start transferMaster to replicas: " + replicas4Transfer);
- if ((replicas4Transfer != null) && (!replicas4Transfer.isEmpty())) {
- repEnv.transferMaster(replicas4Transfer, 5, TimeUnit.MINUTES);
- }
+ repEnv.transferMaster(replicas4Transfer, 5, TimeUnit.MINUTES);
logger.info("transferMaster end...");
} else {
throw new Exception("The replicate nodes is empty!");
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
index dec535b..c9fdaea 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java
@@ -323,7 +323,7 @@ public class BrokerInfoHolder {
}
}
- public class BrokerAbnInfo {
+ public static class BrokerAbnInfo {
private int brokerId;
private int abnStatus; // 0 normal , -100 read abnormal, -1 write abnormal, -101 r & w abnormal
private long firstRepTime;
@@ -364,7 +364,7 @@ public class BrokerInfoHolder {
}
}
- public class BrokerFbdInfo {
+ public static class BrokerFbdInfo {
private int brokerId;
private int befStatus;
private int newStatus;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
index f544829..a6e7e38 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -268,10 +268,10 @@ public class CliProducer extends CliAbstractBase {
try {
long millis = System.currentTimeMillis();
Tuple2<String, String> target = topicSendRounds.get(roundIndex);
- Message message = new Message(target.f0, sentData);
- if (target.f1 != null) {
+ Message message = new Message(target.getF0(), sentData);
+ if (target.getF1() != null) {
// if include filter, add filter item
- message.putSystemHeader(target.f1, sdf.format(new Date(millis)));
+ message.putSystemHeader(target.getF1(), sdf.format(new Date(millis)));
}
// use sync or async process
if (syncProduction) {