You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2021/01/14 01:41:49 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-511]Replace the
conditional operator (?:) with mid()
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new b3b7803 [TUBEMQ-511]Replace the conditional operator (?:) with mid()
b3b7803 is described below
commit b3b78037ff86db8a0539a2928a43942770e39ab8
Author: gosonzhang <go...@tencent.com>
AuthorDate: Wed Jan 13 19:51:52 2021 +0800
[TUBEMQ-511]Replace the conditional operator (?:) with mid()
---
.../apache/tubemq/server/broker/msgstore/MessageStore.java | 13 +++++--------
.../tubemq/server/broker/offset/DefaultOffsetManager.java | 10 ++++------
.../apache/tubemq/server/broker/web/BrokerAdminServlet.java | 13 +++++--------
.../java/org/apache/tubemq/server/master/MasterConfig.java | 3 ++-
.../org/apache/tubemq/server/tools/cli/CliConsumer.java | 2 +-
.../org/apache/tubemq/server/tools/cli/CliProducer.java | 2 +-
6 files changed, 18 insertions(+), 25 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 8f650b4..1987c32 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder;
@@ -133,10 +134,8 @@ public class MessageStore implements Closeable {
this.writeCacheMaxSize = validAndGetMemCacheSize(topicMetadata.getMemCacheMsgSize());
this.writeCacheFlushIntvl = topicMetadata.getMemCacheFlushIntvl();
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
- memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
- ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
- fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
- ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
+ memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000));
+ fileMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 8000, 13500));
memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
@@ -408,10 +407,8 @@ public class MessageStore implements Closeable {
unflushDataHold.set(topicMetadata.getUnflushDataHold());
maxFileValidDurMs.set(parseDeletePolicy(topicMetadata.getDeletePolicy()));
int tmpIndexReadCnt = tubeConfig.getIndexTransCount() * partitionNum;
- memMaxIndexReadCnt.set(tmpIndexReadCnt <= 6000
- ? 6000 : (Math.min(tmpIndexReadCnt, 10000)));
- fileMaxIndexReadCnt.set(tmpIndexReadCnt < 8000
- ? 8000 : (Math.min(tmpIndexReadCnt, 13500)));
+ memMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 6000, 10000));
+ fileMaxIndexReadCnt.set(MixedUtils.mid(tmpIndexReadCnt, 8000, 13500));
memMaxFilterIndexReadCnt.set(memMaxIndexReadCnt.get() * 2);
fileMaxFilterIndexReadCnt.set(fileMaxIndexReadCnt.get() * 3);
fileLowReqMaxFilterIndexReadCnt.set(fileMaxFilterIndexReadCnt.get() * 10);
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
index 0e91dc2..50c8bd3 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corebase.utils.Tuple3;
@@ -121,8 +122,7 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
|| (readStatus == TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS)) {
long adjOffset = indexMaxOffset;
if (readStatus != TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS) {
- adjOffset = Math.min(reqOffset, indexMaxOffset);
- adjOffset = Math.max(adjOffset, indexMinOffset);
+ adjOffset = MixedUtils.mid(reqOffset, indexMinOffset, indexMaxOffset);
}
regInfo.getAndSetOffset(adjOffset);
}
@@ -287,10 +287,8 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse
long reSetOffset, final String modifier) {
long oldOffset = -1;
if (store != null) {
- long firstOffset = store.getIndexMinOffset();
- long lastOffset = store.getIndexMaxOffset();
- reSetOffset = reSetOffset < firstOffset
- ? firstOffset : Math.min(reSetOffset, lastOffset);
+ reSetOffset = MixedUtils.mid(reSetOffset,
+ store.getIndexMinOffset(), store.getIndexMaxOffset());
String offsetCacheKey = getOffsetCacheKey(topic, partitionId);
getAndResetTmpOffset(group, offsetCacheKey);
OffsetStorageInfo regInfo =
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index d35ba91..dd4bbdf 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpServletRequest;
import org.apache.tubemq.corebase.TokenConstants;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corebase.utils.Tuple3;
@@ -959,11 +960,9 @@ public class BrokerAdminServlet extends AbstractWebHandler {
if (store == null) {
continue;
}
- long firstOffset = store.getIndexMinOffset();
- long lastOffset = store.getIndexMaxOffset();
// adjust reset offset value
- adjOffset = offsetTuple.getF0() < firstOffset
- ? firstOffset : Math.min(offsetTuple.getF0(), lastOffset);
+ adjOffset = MixedUtils.mid(offsetTuple.getF0(),
+ store.getIndexMinOffset(), store.getIndexMaxOffset());
result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset));
}
}
@@ -1059,10 +1058,8 @@ public class BrokerAdminServlet extends AbstractWebHandler {
if (store == null) {
continue;
}
- long firstOffset = store.getIndexMinOffset();
- long lastOffset = store.getIndexMaxOffset();
- adjOffset = entry.getValue() < firstOffset
- ? firstOffset : Math.min(entry.getValue(), lastOffset);
+ adjOffset = MixedUtils.mid(entry.getValue(),
+ store.getIndexMinOffset(), store.getIndexMaxOffset());
offsetVals.add(new Tuple3<>(topicName, partitionId, adjOffset));
}
if (offsetVals.isEmpty()) {
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
index b112d66..c2a8007 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/MasterConfig.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.config.TLSConfig;
import org.apache.tubemq.corebase.utils.AddressUtils;
+import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corerpc.RpcConstants;
import org.apache.tubemq.server.common.TServerConstants;
@@ -467,7 +468,7 @@ public class MasterConfig extends AbstractFileConfig {
}
if (TStringUtils.isNotBlank(masterConf.get("rebalanceParallel"))) {
int tmpParallel = this.getInt(masterConf, "rebalanceParallel");
- this.rebalanceParallel = (tmpParallel <= 0) ? 1 : (Math.min(tmpParallel, 20));
+ this.rebalanceParallel = MixedUtils.mid(tmpParallel, 1, 20);
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
index c8938a6..52f0800 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliConsumer.java
@@ -177,7 +177,7 @@ public class CliConsumer extends CliAbstractBase {
String fetchThreadCntStr = cli.getOptionValue(CliArgDef.FETCHTHREADS.longOpt);
if (TStringUtils.isNotBlank(fetchThreadCntStr)) {
int tmpFetchThreadCnt = Integer.parseInt(fetchThreadCntStr);
- tmpFetchThreadCnt = (tmpFetchThreadCnt < 1) ? 1 : Math.min(tmpFetchThreadCnt, 100);
+ tmpFetchThreadCnt = MixedUtils.mid(tmpFetchThreadCnt, 1, 100);
fetchThreadCnt = tmpFetchThreadCnt;
}
return true;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
index a6e7e38..8a01e29 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java
@@ -152,7 +152,7 @@ public class CliProducer extends CliAbstractBase {
String sendThreadCntStr = cli.getOptionValue(CliArgDef.SENDTHREADS.longOpt);
if (TStringUtils.isNotBlank(sendThreadCntStr)) {
int tmpThreadCnt = Integer.parseInt(sendThreadCntStr);
- tmpThreadCnt = (tmpThreadCnt < 1) ? 1 : Math.min(tmpThreadCnt, 200);
+ tmpThreadCnt = MixedUtils.mid(tmpThreadCnt, 1, 200);
sendThreadCnt = tmpThreadCnt;
}
String rpcTimeoutStr = cli.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);