You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/10 09:13:22 UTC
[incubator-inlong] branch master updated: [INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4d236f7 [INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)
4d236f7 is described below
commit 4d236f75606e83774c47b3d1822936d325c1dba8
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 10 17:13:17 2022 +0800
[INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)
---
.../consumer/SimpleClientBalanceConsumer.java | 7 +--
.../inlong/tubemq/corebase/metric/Counter.java | 40 +++++++++++++
.../inlong/tubemq/corebase/metric/Counting.java | 44 +++++++++++++++
.../inlong/tubemq/corebase/metric/Gauge.java | 31 +++++++++++
.../inlong/tubemq/corebase/metric/Histogram.java | 65 ++++++++++++++++++++++
.../inlong/tubemq/corebase/metric/Metric.java | 39 +++++++++++++
.../inlong/tubemq/corebase/metric/MetricSet.java | 53 ++++++++++++++++++
.../nodeproducer/ProducerInfoHolder.java | 4 +-
8 files changed, 277 insertions(+), 6 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 9e3375c..12a10f9 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -269,16 +269,15 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
.append(this.consumerId)
.append(" was already shutdown, do nothing...").toString());
-
}
break;
- case 1: {
+ case 1:
+ default: {
logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
.append(this.consumerId)
.append(" is starting, please wait a minute!").toString());
}
- break;
}
return;
}
@@ -518,7 +517,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
break;
}
if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0)
- && (System.currentTimeMillis() - startTime
+ && ((System.currentTimeMillis() - startTime)
>= consumerConfig.getPullConsumeReadyWaitPeriodMs())) {
result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
return result.isSuccess();
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java
new file mode 100644
index 0000000..c41564b
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which require only increment and decrement.
+ */
+public interface Counter extends Counting, Metric {
+
+ /**
+ * Increment the value.
+ */
+ void incValue();
+
+ /**
+ * Decrement the value.
+ */
+ void decValue();
+
+ /**
+ * Add delta to the value.
+ * @param delta delta value
+ */
+ void addValue(long delta);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java
new file mode 100644
index 0000000..0c061d9
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which have counts.
+ */
+public interface Counting {
+
+ /**
+ * Clear the current value.
+ */
+ void clear();
+
+ /**
+ * Get the current value.
+ *
+ * @return the current value
+ */
+ long getValue();
+
+ /**
+ * Get and reset the current value.
+ *
+ * @return the current value
+ */
+ long getAndResetValue();
+
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java
new file mode 100644
index 0000000..9ade29a
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which has only one value at a specific point in time.
+ */
+public interface Gauge extends Counting, Metric {
+
+ /**
+ * Update a new value.
+ *
+ * @param newValue a new recorded value
+ */
+ void update(long newValue);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java
new file mode 100644
index 0000000..96c859d
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import java.util.Map;
+
+/**
+ * An interface for metric types which calculates the distribution of values.
+ */
+public interface Histogram extends Metric {
+
+ /**
+ * Update a new value.
+ *
+ * @param newValue a new recorded value
+ */
+ void update(long newValue);
+
+ /**
+ * Get the current recorded values.
+ *
+ * @param keyValMap the read result, the key is metric item's full name
+ * @param includeZero whether to include the details item with value 0
+ */
+ void getValue(Map<String, Long> keyValMap, boolean includeZero);
+
+ /**
+ * Get the current recorded values.
+ *
+ * @param strBuff string buffer, json format
+ * @param includeZero whether to include the details item with value 0
+ */
+ void getValue(StringBuilder strBuff, boolean includeZero);
+
+ /**
+ * Get the current recorded values and reset to zero.
+ *
+ * @param keyValMap the read result, the key is metric item's full name
+ * @param includeZero whether to include the details item with value 0
+ */
+ void snapShort(Map<String, Long> keyValMap, boolean includeZero);
+
+ /**
+ * Get the current recorded values and reset to zero.
+ *
+ * @param strBuff string buffer, json format
+ * @param includeZero whether to include the details item with value 0
+ */
+ void snapShort(StringBuilder strBuff, boolean includeZero);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java
new file mode 100644
index 0000000..0f30b44
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric item.
+ */
+public interface Metric {
+
+ /**
+ * Get the metric short name
+ *
+ * @return the metric short name
+ */
+ String getShortName();
+
+ /**
+ * Get the metric full name
+ *
+ * @return the full metric name
+ */
+ String getFullName();
+
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java
new file mode 100644
index 0000000..15e71c7
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import java.util.Map;
+
+/**
+ * An interface for metric item set.
+ */
+public interface MetricSet {
+ /**
+ * Get the current recorded values.
+ *
+ * @param keyValMap the read result, the key is metric item's full name
+ */
+ void getValue(Map<String, Long> keyValMap);
+
+ /**
+ * Get the current recorded values.
+ *
+ * @param strBuff string buffer, json format
+ */
+ void getValue(StringBuilder strBuff);
+
+ /**
+ * Get the current recorded values and reset to zero.
+ *
+ * @param keyValMap the read result, the key is metric item's full name
+ */
+ void snapShort(Map<String, Long> keyValMap);
+
+ /**
+ * Get the current recorded values and reset to zero.
+ *
+ * @param strBuff string buffer, json format
+ */
+ void snapShort(StringBuilder strBuff);
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 22a578c..7aef3a6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -32,8 +32,8 @@ public class ProducerInfoHolder {
}
public void setProducerInfo(String producerId,
- Set<String> topicSet,
- String host, boolean overTLS) {
+ Set<String> topicSet,
+ String host, boolean overTLS) {
if (producerInfoMap.put(producerId,
new ProducerInfo(producerId, topicSet, host, overTLS)) == null) {
MasterMetricsHolder.incProducerCnt();