You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/10 11:18:24 UTC

[incubator-inlong] branch master updated: [INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 332037f  [INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)
332037f is described below

commit 332037f3c8b4954888bb25101e79088d1dde7e92
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 10 19:17:30 2022 +0800

    [INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)
---
 .../tubemq/corebase/metric/impl/BaseMetric.java    |  54 +++++++++
 .../tubemq/corebase/metric/impl/LongMaxGauge.java  |  63 +++++++++++
 .../tubemq/corebase/metric/impl/LongMinGauge.java  |  63 +++++++++++
 .../corebase/metric/impl/LongOnlineCounter.java    |  67 +++++++++++
 .../corebase/metric/impl/LongStatsCounter.java     |  65 +++++++++++
 .../tubemq/corebase/metric/impl/SinceTime.java     |  46 ++++++++
 .../tubemq/corebase/metric/SimpleMetricTest.java   | 122 +++++++++++++++++++++
 7 files changed, 480 insertions(+)

diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java
new file mode 100644
index 0000000..a0170ad
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import org.apache.inlong.tubemq.corebase.metric.Metric;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+
+/**
+ * BaseMetric, store general information about the metric item, such as the metric name.
+ *
+ * The metric includes a base name and a prefix, the full name is composed of
+ *   the prefix + "_" + the base name, different name are selected
+ *   for output according to the metric output requirements.
+ */
+public class BaseMetric implements Metric {
+    // metric short name
+    private final String shortName;
+    // metric full name
+    private final String fullName;
+
+    public BaseMetric(String metricName, String prefix) {
+        this.shortName = metricName;
+        if (TStringUtils.isEmpty(prefix)) {
+            this.fullName = metricName;
+        } else {
+            this.fullName = prefix + "_" + metricName;
+        }
+    }
+
+    @Override
+    public String getShortName() {
+        return this.shortName;
+    }
+
+    @Override
+    public String getFullName() {
+        return this.fullName;
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java
new file mode 100644
index 0000000..4d7cdc1
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.Gauge;
+
+/**
+ * LongMaxGauge, store max value information.
+ *
+ * The metric includes a atomic long value, to store the current max value.
+ */
+public class LongMaxGauge extends BaseMetric implements Gauge {
+    // value counter
+    private final AtomicLong value = new AtomicLong(Long.MIN_VALUE);
+
+    public LongMaxGauge(String metricName, String prefix) {
+        super(metricName, prefix);
+    }
+
+    @Override
+    public void update(long newValue) {
+        while (true) {
+            long cur = this.value.get();
+            if (newValue <= cur) {
+                break;
+            }
+            if (this.value.compareAndSet(cur, newValue)) {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void clear() {
+        this.value.set(0L);
+    }
+
+    @Override
+    public long getValue() {
+        return this.value.get();
+    }
+
+    @Override
+    public long getAndResetValue() {
+        return this.value.getAndSet(Long.MIN_VALUE);
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java
new file mode 100644
index 0000000..ec13ad8
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.Gauge;
+
+/**
+ * LongMinGauge, store min value information.
+ *
+ * The metric includes a atomic long value, to store the current min value.
+ */
+public class LongMinGauge extends BaseMetric implements Gauge {
+    // value counter
+    private final AtomicLong value = new AtomicLong(Long.MAX_VALUE);
+
+    public LongMinGauge(String metricName, String prefix) {
+        super(metricName, prefix);
+    }
+
+    @Override
+    public void update(long newValue) {
+        while (true) {
+            long cur = this.value.get();
+            if (newValue >= cur) {
+                break;
+            }
+            if (this.value.compareAndSet(cur, newValue)) {
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void clear() {
+        this.value.set(Long.MAX_VALUE);
+    }
+
+    @Override
+    public long getValue() {
+        return this.value.get();
+    }
+
+    @Override
+    public long getAndResetValue() {
+        return this.value.getAndSet(Long.MAX_VALUE);
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java
new file mode 100644
index 0000000..29e1647
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.inlong.tubemq.corebase.metric.Counter;
+
+/**
+ * LongOnlineCounter, store current value information.
+ *
+ * The difference between it and LongMinGauge or LongMaxGauge is that:
+ *    1. the value is stored by LongAdder type;
+ *    2. this type of metric cannot be reset.
+ */
+public class LongOnlineCounter extends BaseMetric implements Counter {
+    // value counter
+    private final LongAdder value = new LongAdder();
+
+    public LongOnlineCounter(String metricName, String prefix) {
+        super(metricName, prefix);
+    }
+
+    @Override
+    public void incValue() {
+        this.value.increment();
+    }
+
+    @Override
+    public void decValue() {
+        this.value.decrement();
+    }
+
+    @Override
+    public void addValue(long delta) {
+        this.value.add(delta);
+    }
+
+    @Override
+    public void clear() {
+        this.value.reset();
+    }
+
+    @Override
+    public long getValue() {
+        return this.value.sum();
+    }
+
+    @Override
+    public long getAndResetValue() {
+        return this.value.sum();
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java
new file mode 100644
index 0000000..a019d76
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.inlong.tubemq.corebase.metric.Counter;
+
+/**
+ * LongStatsCounter, store current value information.
+ *
+ * The metric used for statistics, value is stored by LongAdder type.
+ */
+public class LongStatsCounter extends BaseMetric implements Counter {
+    // value counter
+    private final LongAdder value = new LongAdder();
+
+    public LongStatsCounter(String metricName, String prefix) {
+        super(metricName, prefix);
+    }
+
+    @Override
+    public void incValue() {
+        this.value.increment();
+    }
+
+    @Override
+    public void decValue() {
+        this.value.decrement();
+    }
+
+    @Override
+    public void addValue(long delta) {
+        this.value.add(delta);
+    }
+
+    @Override
+    public void clear() {
+        this.value.reset();
+    }
+
+    @Override
+    public long getValue() {
+        return this.value.sum();
+    }
+
+    @Override
+    public long getAndResetValue() {
+        return this.value.sumThenReset();
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java
new file mode 100644
index 0000000..3a90eb1
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SinceTime, store the start time of the metric items set
+ *
+ * After calling the snapshot() function, it need to be updated to the snapshot time.
+ */
+public class SinceTime extends BaseMetric {
+    private final AtomicLong sinceTime = new AtomicLong();
+
+    public SinceTime(String metricName, String prefix) {
+        super(metricName, prefix);
+        reset();
+    }
+
+    public long getSinceTime() {
+        return this.sinceTime.get();
+    }
+
+    public long getAndResetSinceTime() {
+        return this.sinceTime.getAndSet(System.currentTimeMillis());
+    }
+
+    public void reset() {
+        this.sinceTime.set(System.currentTimeMillis());
+    }
+}
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java
new file mode 100644
index 0000000..11783dc
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.apache.inlong.tubemq.corebase.metric.impl.BaseMetric;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongMaxGauge;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongMinGauge;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SimpleMetricTest {
+
+    @Test
+    public void testBaseMetric() {
+        String metricName = "test";
+        String prefix1 = "prefix_1";
+        String prefix2 = "";
+        // test BaseMetric class
+        BaseMetric metric1 = new BaseMetric(metricName, prefix1);
+        Assert.assertEquals(metricName, metric1.getShortName());
+        Assert.assertEquals(prefix1 + "_" + metricName, metric1.getFullName());
+        BaseMetric metric2 = new BaseMetric(metricName, prefix2);
+        Assert.assertEquals(metricName, metric2.getShortName());
+        Assert.assertEquals(metricName, metric2.getFullName());
+        // test sub-class
+        LongMaxGauge metric3 = new LongMaxGauge(metricName, prefix1);
+        Assert.assertEquals(metricName, metric3.getShortName());
+        Assert.assertEquals(prefix1 + "_" + metricName, metric3.getFullName());
+        LongMaxGauge metric4 = new LongMaxGauge(metricName, prefix2);
+        Assert.assertEquals(metricName, metric4.getShortName());
+        Assert.assertEquals(metricName, metric4.getFullName());
+        // test SinceTime
+        SinceTime sinceTime1 = new SinceTime(metricName, prefix1);
+        Assert.assertEquals(metricName, sinceTime1.getShortName());
+        Assert.assertEquals(prefix1 + "_" + metricName, sinceTime1.getFullName());
+        SinceTime sinceTime2 = new SinceTime(metricName, prefix2);
+        Assert.assertEquals(metricName, sinceTime2.getShortName());
+        Assert.assertEquals(metricName, sinceTime2.getFullName());
+        ThreadUtils.sleep(50);
+        long since11 = sinceTime1.getAndResetSinceTime();
+        Assert.assertNotEquals(since11, sinceTime1.getSinceTime());
+    }
+
+    @Test
+    public void testLongMetric() {
+        // test LongMaxGauge
+        LongMaxGauge maxGauge = new LongMaxGauge("max", "long");
+        maxGauge.update(3);
+        maxGauge.update(100);
+        maxGauge.update(50);
+        Assert.assertEquals(100, maxGauge.getValue());
+        maxGauge.update(5000);
+        Assert.assertEquals(5000, maxGauge.getAndResetValue());
+        maxGauge.update(200);
+        maxGauge.update(300);
+        maxGauge.update(50);
+        Assert.assertEquals(300, maxGauge.getValue());
+        // test LongMinGauge
+        LongMinGauge minGauge = new LongMinGauge("min", "long");
+        minGauge.update(3);
+        minGauge.update(100);
+        minGauge.update(50);
+        Assert.assertEquals(3, minGauge.getValue());
+        minGauge.update(1);
+        Assert.assertEquals(1, minGauge.getAndResetValue());
+        minGauge.update(500);
+        minGauge.update(600);
+        minGauge.update(30);
+        Assert.assertEquals(30, minGauge.getValue());
+        // test LongOnlineCounter
+        LongOnlineCounter onlineCounter = new LongOnlineCounter("online", "long");
+        onlineCounter.incValue();
+        onlineCounter.incValue();
+        onlineCounter.decValue();
+        Assert.assertEquals(1, onlineCounter.getValue());
+        onlineCounter.incValue();
+        Assert.assertEquals(2, onlineCounter.getAndResetValue());
+        onlineCounter.decValue();
+        Assert.assertEquals(1, onlineCounter.getValue());
+        onlineCounter.addValue(5);
+        Assert.assertEquals(6, onlineCounter.getValue());
+        onlineCounter.addValue(-5);
+        Assert.assertEquals(1, onlineCounter.getValue());
+        onlineCounter.clear();
+        Assert.assertEquals(0, onlineCounter.getValue());
+        // test LongStatsCounter
+        LongStatsCounter statsCounter = new LongStatsCounter("stats", "long");
+        statsCounter.incValue();
+        statsCounter.incValue();
+        statsCounter.decValue();
+        Assert.assertEquals(1, statsCounter.getValue());
+        statsCounter.incValue();
+        Assert.assertEquals(2, statsCounter.getAndResetValue());
+        statsCounter.decValue();
+        Assert.assertEquals(-1, statsCounter.getValue());
+        statsCounter.addValue(5);
+        Assert.assertEquals(4, statsCounter.getValue());
+        statsCounter.addValue(-3);
+        Assert.assertEquals(1, statsCounter.getValue());
+        statsCounter.clear();
+        Assert.assertEquals(0, statsCounter.getValue());
+    }
+}