You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/10 09:13:22 UTC

[incubator-inlong] branch master updated: [INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d236f7  [INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)
4d236f7 is described below

commit 4d236f75606e83774c47b3d1822936d325c1dba8
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 10 17:13:17 2022 +0800

    [INLONG-2433][TubeMQ] Abstract metrics' interface (#2434)
---
 .../consumer/SimpleClientBalanceConsumer.java      |  7 +--
 .../inlong/tubemq/corebase/metric/Counter.java     | 40 +++++++++++++
 .../inlong/tubemq/corebase/metric/Counting.java    | 44 +++++++++++++++
 .../inlong/tubemq/corebase/metric/Gauge.java       | 31 +++++++++++
 .../inlong/tubemq/corebase/metric/Histogram.java   | 65 ++++++++++++++++++++++
 .../inlong/tubemq/corebase/metric/Metric.java      | 39 +++++++++++++
 .../inlong/tubemq/corebase/metric/MetricSet.java   | 53 ++++++++++++++++++
 .../nodeproducer/ProducerInfoHolder.java           |  4 +-
 8 files changed, 277 insertions(+), 6 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 9e3375c..12a10f9 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -269,16 +269,15 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                     logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
                             .append(this.consumerId)
                             .append(" was already shutdown, do nothing...").toString());
-
                 }
                 break;
 
-                case 1: {
+                case 1:
+                default: {
                     logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
                             .append(this.consumerId)
                             .append(" is starting, please wait a minute!").toString());
                 }
-                break;
             }
             return;
         }
@@ -518,7 +517,7 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
                 break;
             }
             if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0)
-                    && (System.currentTimeMillis() - startTime
+                    && ((System.currentTimeMillis() - startTime)
                     >= consumerConfig.getPullConsumeReadyWaitPeriodMs())) {
                 result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
                 return result.isSuccess();
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java
new file mode 100644
index 0000000..c41564b
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which require only increment and decrement.
+ */
+public interface Counter extends Counting, Metric {
+
+    /**
+     * Increment the value.
+     */
+    void incValue();
+
+    /**
+     * Decrement the value.
+     */
+    void decValue();
+
+    /**
+     * Add delta to the value.
+     * @param delta  delta value
+     */
+    void addValue(long delta);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java
new file mode 100644
index 0000000..0c061d9
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Counting.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which have counts.
+ */
+public interface Counting {
+
+    /**
+     * Clear the current value.
+     */
+    void clear();
+
+    /**
+     * Get the current value.
+     *
+     * @return   the current value
+     */
+    long getValue();
+
+    /**
+     * Get and reset the current value.
+     *
+     * @return   the current value
+     */
+    long getAndResetValue();
+
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java
new file mode 100644
index 0000000..9ade29a
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Gauge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric types which has only one value at a specific point in time.
+ */
+public interface Gauge extends Counting, Metric {
+
+    /**
+     * Update a new value.
+     *
+     * @param newValue a new recorded value
+     */
+    void update(long newValue);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java
new file mode 100644
index 0000000..96c859d
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Histogram.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import java.util.Map;
+
+/**
+ * An interface for metric types which calculates the distribution of values.
+ */
+public interface Histogram extends Metric {
+
+    /**
+     * Update a new value.
+     *
+     * @param newValue a new recorded value
+     */
+    void update(long newValue);
+
+    /**
+     * Get the current recorded values.
+     *
+     * @param keyValMap     the read result, the key is metric item's full name
+     * @param includeZero   whether to include the details item with value 0
+     */
+    void getValue(Map<String, Long> keyValMap, boolean includeZero);
+
+    /**
+     * Get the current recorded values.
+     *
+     * @param strBuff       string buffer, json format
+     * @param includeZero   whether to include the details item with value 0
+     */
+    void getValue(StringBuilder strBuff, boolean includeZero);
+
+    /**
+     * Get the current recorded values and reset to zero.
+     *
+     * @param keyValMap     the read result, the key is metric item's full name
+     * @param includeZero   whether to include the details item with value 0
+     */
+    void snapShort(Map<String, Long> keyValMap, boolean includeZero);
+
+    /**
+     * Get the current recorded values and reset to zero.
+     *
+     * @param strBuff       string buffer, json format
+     * @param includeZero   whether to include the details item with value 0
+     */
+    void snapShort(StringBuilder strBuff, boolean includeZero);
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java
new file mode 100644
index 0000000..0f30b44
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/Metric.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+/**
+ * An interface for metric item.
+ */
+public interface Metric {
+
+    /**
+     * Get the metric short name
+     *
+     * @return  the metric short name
+     */
+    String getShortName();
+
+    /**
+     * Get the metric full name
+     *
+     * @return  the full metric name
+     */
+    String getFullName();
+
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java
new file mode 100644
index 0000000..15e71c7
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/MetricSet.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import java.util.Map;
+
+/**
+ * An interface for metric item set.
+ */
+public interface MetricSet {
+    /**
+     * Get the current recorded values.
+     *
+     * @param keyValMap     the read result, the key is metric item's full name
+     */
+    void getValue(Map<String, Long> keyValMap);
+
+    /**
+     * Get the current recorded values.
+     *
+     * @param strBuff       string buffer, json format
+     */
+    void getValue(StringBuilder strBuff);
+
+    /**
+     * Get the current recorded values and reset to zero.
+     *
+     * @param keyValMap     the read result, the key is metric item's full name
+     */
+    void snapShort(Map<String, Long> keyValMap);
+
+    /**
+     * Get the current recorded values and reset to zero.
+     *
+     * @param strBuff       string buffer, json format
+     */
+    void snapShort(StringBuilder strBuff);
+}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
index 22a578c..7aef3a6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeproducer/ProducerInfoHolder.java
@@ -32,8 +32,8 @@ public class ProducerInfoHolder {
     }
 
     public void setProducerInfo(String producerId,
-                                        Set<String> topicSet,
-                                        String host, boolean overTLS) {
+                                Set<String> topicSet,
+                                String host, boolean overTLS) {
         if (producerInfoMap.put(producerId,
                 new ProducerInfo(producerId, topicSet, host, overTLS)) == null) {
             MasterMetricsHolder.incProducerCnt();