You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/10 11:18:24 UTC
[incubator-inlong] branch master updated: [INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 332037f [INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)
332037f is described below
commit 332037f3c8b4954888bb25101e79088d1dde7e92
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Feb 10 19:17:30 2022 +0800
[INLONG-2445][TubeMQ] Add Gauge and Counter implementation classes (#2446)
---
.../tubemq/corebase/metric/impl/BaseMetric.java | 54 +++++++++
.../tubemq/corebase/metric/impl/LongMaxGauge.java | 63 +++++++++++
.../tubemq/corebase/metric/impl/LongMinGauge.java | 63 +++++++++++
.../corebase/metric/impl/LongOnlineCounter.java | 67 +++++++++++
.../corebase/metric/impl/LongStatsCounter.java | 65 +++++++++++
.../tubemq/corebase/metric/impl/SinceTime.java | 46 ++++++++
.../tubemq/corebase/metric/SimpleMetricTest.java | 122 +++++++++++++++++++++
7 files changed, 480 insertions(+)
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java
new file mode 100644
index 0000000..a0170ad
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/BaseMetric.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import org.apache.inlong.tubemq.corebase.metric.Metric;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+
+/**
+ * BaseMetric, store general information about the metric item, such as the metric name.
+ *
+ * The metric includes a base name and a prefix, the full name is composed of
+ * the prefix + "_" + the base name, different name are selected
+ * for output according to the metric output requirements.
+ */
+public class BaseMetric implements Metric {
+ // metric short name
+ private final String shortName;
+ // metric full name
+ private final String fullName;
+
+ public BaseMetric(String metricName, String prefix) {
+ this.shortName = metricName;
+ if (TStringUtils.isEmpty(prefix)) {
+ this.fullName = metricName;
+ } else {
+ this.fullName = prefix + "_" + metricName;
+ }
+ }
+
+ @Override
+ public String getShortName() {
+ return this.shortName;
+ }
+
+ @Override
+ public String getFullName() {
+ return this.fullName;
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java
new file mode 100644
index 0000000..4d7cdc1
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMaxGauge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.Gauge;
+
+/**
+ * LongMaxGauge, store max value information.
+ *
+ * The metric includes a atomic long value, to store the current max value.
+ */
+public class LongMaxGauge extends BaseMetric implements Gauge {
+ // value counter
+ private final AtomicLong value = new AtomicLong(Long.MIN_VALUE);
+
+ public LongMaxGauge(String metricName, String prefix) {
+ super(metricName, prefix);
+ }
+
+ @Override
+ public void update(long newValue) {
+ while (true) {
+ long cur = this.value.get();
+ if (newValue <= cur) {
+ break;
+ }
+ if (this.value.compareAndSet(cur, newValue)) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ this.value.set(0L);
+ }
+
+ @Override
+ public long getValue() {
+ return this.value.get();
+ }
+
+ @Override
+ public long getAndResetValue() {
+ return this.value.getAndSet(Long.MIN_VALUE);
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java
new file mode 100644
index 0000000..ec13ad8
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongMinGauge.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.corebase.metric.Gauge;
+
+/**
+ * LongMinGauge, store min value information.
+ *
+ * The metric includes a atomic long value, to store the current min value.
+ */
+public class LongMinGauge extends BaseMetric implements Gauge {
+ // value counter
+ private final AtomicLong value = new AtomicLong(Long.MAX_VALUE);
+
+ public LongMinGauge(String metricName, String prefix) {
+ super(metricName, prefix);
+ }
+
+ @Override
+ public void update(long newValue) {
+ while (true) {
+ long cur = this.value.get();
+ if (newValue >= cur) {
+ break;
+ }
+ if (this.value.compareAndSet(cur, newValue)) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void clear() {
+ this.value.set(Long.MAX_VALUE);
+ }
+
+ @Override
+ public long getValue() {
+ return this.value.get();
+ }
+
+ @Override
+ public long getAndResetValue() {
+ return this.value.getAndSet(Long.MAX_VALUE);
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java
new file mode 100644
index 0000000..29e1647
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongOnlineCounter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.inlong.tubemq.corebase.metric.Counter;
+
+/**
+ * LongOnlineCounter, store current value information.
+ *
+ * The difference between it and LongMinGauge or LongMaxGauge is that:
+ * 1. the value is stored by LongAdder type;
+ * 2. this type of metric cannot be reset.
+ */
+public class LongOnlineCounter extends BaseMetric implements Counter {
+ // value counter
+ private final LongAdder value = new LongAdder();
+
+ public LongOnlineCounter(String metricName, String prefix) {
+ super(metricName, prefix);
+ }
+
+ @Override
+ public void incValue() {
+ this.value.increment();
+ }
+
+ @Override
+ public void decValue() {
+ this.value.decrement();
+ }
+
+ @Override
+ public void addValue(long delta) {
+ this.value.add(delta);
+ }
+
+ @Override
+ public void clear() {
+ this.value.reset();
+ }
+
+ @Override
+ public long getValue() {
+ return this.value.sum();
+ }
+
+ @Override
+ public long getAndResetValue() {
+ return this.value.sum();
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java
new file mode 100644
index 0000000..a019d76
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/LongStatsCounter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.inlong.tubemq.corebase.metric.Counter;
+
+/**
+ * LongStatsCounter, store current value information.
+ *
+ * The metric used for statistics, value is stored by LongAdder type.
+ */
+public class LongStatsCounter extends BaseMetric implements Counter {
+ // value counter
+ private final LongAdder value = new LongAdder();
+
+ public LongStatsCounter(String metricName, String prefix) {
+ super(metricName, prefix);
+ }
+
+ @Override
+ public void incValue() {
+ this.value.increment();
+ }
+
+ @Override
+ public void decValue() {
+ this.value.decrement();
+ }
+
+ @Override
+ public void addValue(long delta) {
+ this.value.add(delta);
+ }
+
+ @Override
+ public void clear() {
+ this.value.reset();
+ }
+
+ @Override
+ public long getValue() {
+ return this.value.sum();
+ }
+
+ @Override
+ public long getAndResetValue() {
+ return this.value.sumThenReset();
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java
new file mode 100644
index 0000000..3a90eb1
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/metric/impl/SinceTime.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * SinceTime, store the start time of the metric items set
+ *
+ * After calling the snapshot() function, it need to be updated to the snapshot time.
+ */
+public class SinceTime extends BaseMetric {
+ private final AtomicLong sinceTime = new AtomicLong();
+
+ public SinceTime(String metricName, String prefix) {
+ super(metricName, prefix);
+ reset();
+ }
+
+ public long getSinceTime() {
+ return this.sinceTime.get();
+ }
+
+ public long getAndResetSinceTime() {
+ return this.sinceTime.getAndSet(System.currentTimeMillis());
+ }
+
+ public void reset() {
+ this.sinceTime.set(System.currentTimeMillis());
+ }
+}
diff --git a/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java
new file mode 100644
index 0000000..11783dc
--- /dev/null
+++ b/inlong-tubemq/tubemq-core/src/test/java/org/apache/inlong/tubemq/corebase/metric/SimpleMetricTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.corebase.metric;
+
+import org.apache.inlong.tubemq.corebase.metric.impl.BaseMetric;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongMaxGauge;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongMinGauge;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongOnlineCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.LongStatsCounter;
+import org.apache.inlong.tubemq.corebase.metric.impl.SinceTime;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SimpleMetricTest {
+
+ @Test
+ public void testBaseMetric() {
+ String metricName = "test";
+ String prefix1 = "prefix_1";
+ String prefix2 = "";
+ // test BaseMetric class
+ BaseMetric metric1 = new BaseMetric(metricName, prefix1);
+ Assert.assertEquals(metricName, metric1.getShortName());
+ Assert.assertEquals(prefix1 + "_" + metricName, metric1.getFullName());
+ BaseMetric metric2 = new BaseMetric(metricName, prefix2);
+ Assert.assertEquals(metricName, metric2.getShortName());
+ Assert.assertEquals(metricName, metric2.getFullName());
+ // test sub-class
+ LongMaxGauge metric3 = new LongMaxGauge(metricName, prefix1);
+ Assert.assertEquals(metricName, metric3.getShortName());
+ Assert.assertEquals(prefix1 + "_" + metricName, metric3.getFullName());
+ LongMaxGauge metric4 = new LongMaxGauge(metricName, prefix2);
+ Assert.assertEquals(metricName, metric4.getShortName());
+ Assert.assertEquals(metricName, metric4.getFullName());
+ // test SinceTime
+ SinceTime sinceTime1 = new SinceTime(metricName, prefix1);
+ Assert.assertEquals(metricName, sinceTime1.getShortName());
+ Assert.assertEquals(prefix1 + "_" + metricName, sinceTime1.getFullName());
+ SinceTime sinceTime2 = new SinceTime(metricName, prefix2);
+ Assert.assertEquals(metricName, sinceTime2.getShortName());
+ Assert.assertEquals(metricName, sinceTime2.getFullName());
+ ThreadUtils.sleep(50);
+ long since11 = sinceTime1.getAndResetSinceTime();
+ Assert.assertNotEquals(since11, sinceTime1.getSinceTime());
+ }
+
+ @Test
+ public void testLongMetric() {
+ // test LongMaxGauge
+ LongMaxGauge maxGauge = new LongMaxGauge("max", "long");
+ maxGauge.update(3);
+ maxGauge.update(100);
+ maxGauge.update(50);
+ Assert.assertEquals(100, maxGauge.getValue());
+ maxGauge.update(5000);
+ Assert.assertEquals(5000, maxGauge.getAndResetValue());
+ maxGauge.update(200);
+ maxGauge.update(300);
+ maxGauge.update(50);
+ Assert.assertEquals(300, maxGauge.getValue());
+ // test LongMinGauge
+ LongMinGauge minGauge = new LongMinGauge("min", "long");
+ minGauge.update(3);
+ minGauge.update(100);
+ minGauge.update(50);
+ Assert.assertEquals(3, minGauge.getValue());
+ minGauge.update(1);
+ Assert.assertEquals(1, minGauge.getAndResetValue());
+ minGauge.update(500);
+ minGauge.update(600);
+ minGauge.update(30);
+ Assert.assertEquals(30, minGauge.getValue());
+ // test LongOnlineCounter
+ LongOnlineCounter onlineCounter = new LongOnlineCounter("online", "long");
+ onlineCounter.incValue();
+ onlineCounter.incValue();
+ onlineCounter.decValue();
+ Assert.assertEquals(1, onlineCounter.getValue());
+ onlineCounter.incValue();
+ Assert.assertEquals(2, onlineCounter.getAndResetValue());
+ onlineCounter.decValue();
+ Assert.assertEquals(1, onlineCounter.getValue());
+ onlineCounter.addValue(5);
+ Assert.assertEquals(6, onlineCounter.getValue());
+ onlineCounter.addValue(-5);
+ Assert.assertEquals(1, onlineCounter.getValue());
+ onlineCounter.clear();
+ Assert.assertEquals(0, onlineCounter.getValue());
+ // test LongStatsCounter
+ LongStatsCounter statsCounter = new LongStatsCounter("stats", "long");
+ statsCounter.incValue();
+ statsCounter.incValue();
+ statsCounter.decValue();
+ Assert.assertEquals(1, statsCounter.getValue());
+ statsCounter.incValue();
+ Assert.assertEquals(2, statsCounter.getAndResetValue());
+ statsCounter.decValue();
+ Assert.assertEquals(-1, statsCounter.getValue());
+ statsCounter.addValue(5);
+ Assert.assertEquals(4, statsCounter.getValue());
+ statsCounter.addValue(-3);
+ Assert.assertEquals(1, statsCounter.getValue());
+ statsCounter.clear();
+ Assert.assertEquals(0, statsCounter.getValue());
+ }
+}