You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by jf...@apache.org on 2020/06/01 19:36:08 UTC

[incubator-iotdb] 01/01: Add Metrics Service based on Micrometer.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch feature/improve-monitoring-metrics
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit af08f6fc1864ebc13445c3a710a944ffc3ad82af
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Mon Jun 1 21:35:34 2020 +0200

    Add Metrics Service based on Micrometer.
    
    * Prometheus Endpoint is exposed based on Micrometer Prometheus.
    * Add IoTDBRegistry which stores all Metrics as IoTDB timeseries under root._metrics
    * Added Monitoring at several points in the programm
    * Added IService Implementation for new Service
---
 server/pom.xml                                     |  11 +
 .../apache/iotdb/db/metrics2/IoTDBRegistry.java    | 229 +++++++++++++++++++++
 .../iotdb/db/metrics2/IoTDBRegistryConfig.java     |  33 +++
 .../iotdb/db/metrics2/MicrometerServerService.java | 112 ++++++++++
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   7 +
 .../db/writelog/node/ExclusiveWriteLogNode.java    |   5 +
 8 files changed, 400 insertions(+)

diff --git a/server/pom.xml b/server/pom.xml
index 351f6a6..c622007 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -170,6 +170,17 @@
             <artifactId>oauth2-oidc-sdk</artifactId>
             <version>8.3</version>
         </dependency>
+        <!-- Micrometer Metrics -->
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+            <version>1.1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>io.github.mweirauch</groupId>
+            <artifactId>micrometer-jvm-extras</artifactId>
+            <version>0.2.0</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java
new file mode 100644
index 0000000..5ca656c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metrics2;
+
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.FunctionCounter;
+import io.micrometer.core.instrument.FunctionTimer;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.LongTaskTimer;
+import io.micrometer.core.instrument.Measurement;
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.config.NamingConvention;
+import io.micrometer.core.instrument.cumulative.CumulativeCounter;
+import io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary;
+import io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter;
+import io.micrometer.core.instrument.cumulative.CumulativeFunctionTimer;
+import io.micrometer.core.instrument.cumulative.CumulativeTimer;
+import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
+import io.micrometer.core.instrument.distribution.pause.PauseDetector;
+import io.micrometer.core.instrument.internal.DefaultGauge;
+import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
+import io.micrometer.core.instrument.internal.DefaultMeter;
+import io.micrometer.core.instrument.push.PushMeterRegistry;
+import io.micrometer.core.instrument.util.NamedThreadFactory;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.function.ToDoubleFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Registry which stores all values as IoTDB Time Series.
+ * All time-related values should be milliseconds.
+ *
+ * With default settings, each second all values are written.
+ */
+public class IoTDBRegistry extends PushMeterRegistry {
+
+    private static final Logger logger = LoggerFactory.getLogger(IoTDBRegistry.class);
+
+    private final PlanExecutor executor;
+    private final Planner planner;
+
+    public IoTDBRegistry(IoTDBRegistryConfig config, Clock clock) {
+        super(config, clock);
+
+        planner = new Planner();
+        try {
+            executor = new PlanExecutor();
+        } catch (QueryProcessException e) {
+            throw new RuntimeException("Unable to instantiate IoTDB Metric Backend", e);
+        }
+
+        // Prepare a metric for here...
+        start(new NamedThreadFactory("iotdb-metrics-publisher"));
+    }
+
+    @Override
+    protected <T> Gauge newGauge(Meter.Id id, T obj, ToDoubleFunction<T> valueFunction) {
+        return new DefaultGauge<>(id, obj, valueFunction);
+    }
+
+    @Override
+    protected Counter newCounter(Meter.Id id) {
+        return new CumulativeCounter(id);
+    }
+
+    @Override
+    protected LongTaskTimer newLongTaskTimer(Meter.Id id) {
+        return new DefaultLongTaskTimer(id, clock);
+    }
+
+    @Override
+    protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector) {
+        return new CumulativeTimer(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
+        return new CumulativeDistributionSummary(id, clock, distributionStatisticConfig, scale, true);
+    }
+
+    @Override
+    protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable<Measurement> measurements) {
+        return new DefaultMeter(id, type, measurements);
+    }
+
+    @Override
+    protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
+        return new CumulativeFunctionTimer<T>(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    protected <T> FunctionCounter newFunctionCounter(Meter.Id id, T obj, ToDoubleFunction<T> countFunction) {
+        return new CumulativeFunctionCounter<T>(id, obj, countFunction);
+    }
+
+    @Override
+    protected TimeUnit getBaseTimeUnit() {
+        return TimeUnit.MILLISECONDS;
+    }
+
+    @Override
+    protected DistributionStatisticConfig defaultHistogramConfig() {
+        return DistributionStatisticConfig.DEFAULT;
+    }
+
+
+    @Override
+    protected void publish() {
+        Metrics.timer("metrics.write.timer").record(this::writeMetrics);
+    }
+
+    private void writeMetrics() {
+        for (Meter meter : getMeters()) {
+
+
+            // Add this to an IoTDB Timeseries now
+            final String conventionName = meter.getId().getConventionName(NamingConvention.dot);
+            final List<Tag> conventionTags = meter.getId().getConventionTags(NamingConvention.dot);
+
+            // Now we add this as a timeseries
+            final String query = meter.match(
+                g -> createQuery(conventionName, conventionTags, g.value()),
+                c -> createQuery(conventionName, conventionTags, c.count()),
+                t -> createQueryForTimer(conventionName, conventionTags, t),
+                a -> {throw new NotImplementedException("");},
+                a -> {throw new NotImplementedException("");},
+                tg -> createQuery(conventionName, conventionTags, tg.value(TimeUnit.MILLISECONDS)),
+                fc -> createQuery(conventionName, conventionTags, fc.count()),
+                a -> {throw new NotImplementedException("");},
+                a -> {throw new NotImplementedException("");});
+
+            try {
+                final PhysicalPlan physicalPlan = planner.parseSQLToPhysicalPlan(query);
+                final boolean success = executor.processNonQuery(physicalPlan);
+                if (!success) {
+                    logger.warn("Unable to process metrics query '{}'!", query);
+                }
+            } catch (QueryProcessException | StorageEngineException | StorageGroupNotSetException e) {
+                logger.error("Unable to store metrics", e);
+            }
+        }
+    }
+
+    private String createQuery(String conventionName, List<Tag> conventionTags, double value) {
+        final String tagKeys = conventionTags.stream()
+            .map(Tag::getKey)
+            .collect(Collectors.joining(","));
+        final String tagValues = conventionTags.stream()
+            .map(Tag::getValue)
+            .map(s -> "\"" + s + "\"")
+            .collect(Collectors.joining(","));
+
+        final String escapedPath = conventionName
+            .replace("load", "_load")
+            .replace("count", "_count")
+            .replace("time", "_time");
+
+        final String query;
+        if (tagKeys.isEmpty()) {
+            // In this case we use the last part as measurement
+            final int idx = escapedPath.lastIndexOf(".");
+            String path = escapedPath.substring(0, idx);
+            String name = escapedPath.substring(idx + 1);
+            query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s.%s (timestamp, value) VALUES (NOW(), %f)", path, name, value);
+        } else {
+            query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, %s, value) VALUES (NOW(), %s, %f)", escapedPath, tagKeys, tagValues, value);
+        }
+        return query;
+    }
+
+    private String createQueryForTimer(String conventionName, List<Tag> conventionTags, Timer timer) {
+        final String tagKeys = conventionTags.stream()
+            .map(Tag::getKey)
+            .collect(Collectors.joining(","));
+        final String tagValues = conventionTags.stream()
+            .map(Tag::getValue)
+            .map(s -> "\"" + s + "\"")
+            .collect(Collectors.joining(","));
+
+        final String escapedPath = conventionName
+            .replace("load", "_load")
+            .replace("count", "_count")
+            .replace("time", "_time");
+
+        final String query;
+        if (tagKeys.isEmpty()) {
+            query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, _count, _mean, _max, _total) VALUES (NOW(), %d, %f, %f, %f)", conventionName, timer.count(), timer.mean(TimeUnit.MILLISECONDS), timer.max(TimeUnit.MILLISECONDS), timer.totalTime(TimeUnit.MILLISECONDS));
+        } else {
+            query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, %s, _count, _mean, _max, _total) VALUES (NOW(), %s, %d, %f, %f, %f)", escapedPath, tagKeys, tagValues, timer.count(), timer.mean(TimeUnit.MILLISECONDS), timer.max(TimeUnit.MILLISECONDS), timer.totalTime(TimeUnit.MILLISECONDS));
+        }
+        return query;
+    }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java
new file mode 100644
index 0000000..aa46657
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metrics2;
+
+import io.micrometer.core.instrument.push.PushRegistryConfig;
+
+public interface IoTDBRegistryConfig extends PushRegistryConfig {
+
+    IoTDBRegistryConfig DEFAULT = k -> null;
+
+    @Override
+    default String prefix() {
+        return "iotdb";
+    }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java b/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java
new file mode 100644
index 0000000..af90e86
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metrics2;
+
+import com.sun.net.httpserver.HttpServer;
+import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics;
+import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics;
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
+import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
+import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
+import io.micrometer.core.instrument.binder.system.UptimeMetrics;
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.micrometer.prometheus.PrometheusConfig;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+/**
+ * Metrics Service that is based on micrometer.
+ * It does two things.
+ * First, exposes a Prometheus Endpoint on :8080/metrics.
+ * Second, it logs all collected metrics into IoTDB in the storage group root._metrics.
+ */
+public class MicrometerServerService implements IService {
+
+    private static final MicrometerServerService INSTANCE = new MicrometerServerService();
+
+    private HttpServer server;
+    private final PrometheusMeterRegistry prometheusMeterRegistry;
+
+    public MicrometerServerService() {
+        prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
+    }
+
+    public static IService getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public void start() throws StartupException {
+        // Define Meter Registry
+        MeterRegistry registry = new CompositeMeterRegistry(Clock.SYSTEM,
+            Arrays.asList(new IoTDBRegistry(IoTDBRegistryConfig.DEFAULT, Clock.SYSTEM), prometheusMeterRegistry));
+        // Set this as default, then users can simply write Metrics.xxx
+        Metrics.addRegistry(registry);
+        // Wire up JVM and Other Default Bindings
+        new ClassLoaderMetrics().bindTo(registry);
+        new JvmMemoryMetrics().bindTo(registry);
+        new JvmGcMetrics().bindTo(registry);
+        new ProcessorMetrics().bindTo(registry);
+        new JvmThreadMetrics().bindTo(registry);
+        new ProcessMemoryMetrics().bindTo(registry);
+        new ProcessThreadMetrics().bindTo(registry);
+        new UptimeMetrics().bindTo(registry);
+        new FileDescriptorMetrics().bindTo(registry);
+
+        // Serve an Endpoint for prometheus
+        try {
+            server = HttpServer.create(new InetSocketAddress(8080), 0);
+            server.createContext("/metrics", httpExchange -> {
+                String response = prometheusMeterRegistry.scrape();
+                httpExchange.sendResponseHeaders(200, response.getBytes().length);
+                try (OutputStream os = httpExchange.getResponseBody()) {
+                    os.write(response.getBytes());
+                }
+            });
+
+            new Thread(server::start).start();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        server.stop(0);
+    }
+
+    @Override
+    public ServiceType getID() {
+        return ServiceType.METRICS2_SERVICE;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0988c17..a86e18a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metrics2.MicrometerServerService;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.rescon.TVListAllocator;
 import org.apache.iotdb.db.sync.receiver.SyncServerManager;
@@ -105,6 +106,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(UpgradeSevice.getINSTANCE());
     registerManager.register(MergeManager.getINSTANCE());
     registerManager.register(StorageEngine.getInstance());
+    registerManager.register(MicrometerServerService.getInstance());
 
     // When registering statMonitor, we should start recovering some statistics
     // with latest values stored
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index e1498bc..c0ab389 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -25,6 +25,7 @@ public enum ServiceType {
   STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""),
   JMX_SERVICE("JMX ServerService", "JMX ServerService"),
   METRICS_SERVICE("Metrics ServerService","MetricsService"),
+  METRICS2_SERVICE("Micrometer based Metrics ServerService","Metrics2Service"),
   RPC_SERVICE("RPC ServerService", "RPCService"),
   MQTT_SERVICE("MQTTService", ""),
   MONITOR_SERVICE("Monitor ServerService", "Monitor"),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d0cff5d..d4dc0af 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import io.micrometer.core.instrument.Metrics;
 import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -178,11 +179,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         IoTDBConstant.GLOBAL_DB_NAME,
         req.getUsername());
 
+
     boolean status;
     IAuthorizer authorizer;
     try {
       authorizer = BasicAuthorizer.getInstance();
     } catch (AuthException e) {
+      Metrics.counter("open.session.request", "status", "INTERNAL_EXCEPTION").increment();
       throw new TException(e);
     }
     String loginMessage = null;
@@ -190,6 +193,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       status = authorizer.login(req.getUsername(), req.getPassword());
     } catch (AuthException e) {
       logger.info("meet error while logging in.", e);
+      Metrics.counter("open.session.request", "status", "INTERNAL_EXCEPTION").increment();
       status = false;
       loginMessage = e.getMessage();
     }
@@ -205,16 +209,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
             TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2);
         resp.setSessionId(sessionId);
+        Metrics.counter("open.session.request", "status", "VERSION_INCOMPATIBLE").increment();
         return resp;
       }
 
       tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully");
+      Metrics.counter("open.session.request", "status", "SUCCESS").increment();
       sessionId = sessionIdGenerator.incrementAndGet();
       sessionIdUsernameMap.put(sessionId, req.getUsername());
       sessionIdZoneIdMap.put(sessionId, config.getZoneID());
       currSessionId.set(sessionId);
     } else {
       tsStatus = RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR);
+      Metrics.counter("open.session.request", "status", "LOGIN_FAILED").increment();
       tsStatus.setMessage(loginMessage);
     }
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus,
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index dda1a7e..397987f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -64,6 +66,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   private long lastFlushedId = 0;
 
   private int bufferedLogNum = 0;
+  private final Counter syncCounter;
 
   /**
    * constructor of ExclusiveWriteLogNode.
@@ -77,6 +80,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) {
       logger.info("create the WAL folder {}." + logDirectory);
     }
+    syncCounter = Metrics.counter("wal.sync.count", "_group", identifier.substring(0, identifier.indexOf("-")));
   }
 
   @Override
@@ -215,6 +219,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   private void sync() {
+    syncCounter.increment();
     lock.writeLock().lock();
     try {
       if (bufferedLogNum == 0) {