You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/30 11:51:20 UTC

[inlong] branch master updated: [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c62a3ca3 [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
8c62a3ca3 is described below

commit 8c62a3ca3dafefdc1cf5f8dbb42e56d4632eb68a
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 30 19:51:14 2022 +0800

    [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
---
 .../inlong/audit/service/ClickHouseService.java    | 45 +++++++++++-----------
 1 file changed, 22 insertions(+), 23 deletions(-)

diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index 24042d5aa..6d98973a2 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * ClickHouseService
@@ -54,7 +55,7 @@ public class ClickHouseService implements InsertData, AutoCloseable {
     private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
     private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
     private AtomicInteger batchCounter = new AtomicInteger(0);
-
+    private AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis());
     private Connection conn;
 
     /**
@@ -77,17 +78,11 @@ public class ClickHouseService implements InsertData, AutoCloseable {
             Class.forName(chConfig.getDriver());
             this.reconnect();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("ClickHouseService start failure!", e);
         }
-        // timer
-        long currentTime = System.currentTimeMillis();
-        // batch output interval
-        timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true),
-                currentTime + chConfig.getBatchIntervalMs(),
-                chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
-        // batch output process
-        timerService.scheduleWithFixedDelay(() -> processOutput(),
-                currentTime + chConfig.getProcessIntervalMs(),
+        // start timer
+        timerService.scheduleWithFixedDelay(this::processOutput,
+                chConfig.getProcessIntervalMs(),
                 chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
     }
 
@@ -95,14 +90,15 @@ public class ClickHouseService implements InsertData, AutoCloseable {
      * processOutput
      */
     private void processOutput() {
-        if (!this.needBatchOutput.get()) {
+        if (!this.needBatchOutput.get()
+                && (System.currentTimeMillis() - lastCheckTime.get() < chConfig.getBatchIntervalMs())) {
             return;
         }
         // output
         try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) {
-            // insert data
-            ClickHouseDataPo data = this.batchQueue.poll();
             int counter = 0;
+            // output data to clickhouse
+            ClickHouseDataPo data = this.batchQueue.poll();
             while (data != null) {
                 pstat.setString(1, data.getIp());
                 pstat.setString(2, data.getDockerId());
@@ -124,20 +120,23 @@ public class ClickHouseService implements InsertData, AutoCloseable {
                     this.conn.commit();
                     counter = 0;
                 }
+                data = this.batchQueue.poll();
             }
-            this.batchCounter.set(0);
-            pstat.executeBatch();
-            this.conn.commit();
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            if (counter > 0) {
+                pstat.executeBatch();
+                this.conn.commit();
+            }
+        } catch (Exception e1) {
+            LOG.error("Execute output to clickhouse failure!", e1);
+            // re-connect clickhouse
             try {
                 this.reconnect();
-            } catch (SQLException e1) {
-                LOG.error(e1.getMessage(), e1);
+            } catch (SQLException e2) {
+                LOG.error("Re-connect clickhouse failure!", e2);
             }
         }
-
-        // recover
+        // recover flag
+        lastCheckTime.set(System.currentTimeMillis());
         this.needBatchOutput.compareAndSet(true, false);
     }