You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/30 11:51:20 UTC
[inlong] branch master updated: [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8c62a3ca3 [INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
8c62a3ca3 is described below
commit 8c62a3ca3dafefdc1cf5f8dbb42e56d4632eb68a
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Sep 30 19:51:14 2022 +0800
[INLONG-6076][Audit] Bugs in the ClickHouseService.processOutput() (#6077)
---
.../inlong/audit/service/ClickHouseService.java | 45 +++++++++++-----------
1 file changed, 22 insertions(+), 23 deletions(-)
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
index 24042d5aa..6d98973a2 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ClickHouseService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ClickHouseService
@@ -54,7 +55,7 @@ public class ClickHouseService implements InsertData, AutoCloseable {
private LinkedBlockingQueue<ClickHouseDataPo> batchQueue;
private AtomicBoolean needBatchOutput = new AtomicBoolean(false);
private AtomicInteger batchCounter = new AtomicInteger(0);
-
+ private AtomicLong lastCheckTime = new AtomicLong(System.currentTimeMillis());
private Connection conn;
/**
@@ -77,17 +78,11 @@ public class ClickHouseService implements InsertData, AutoCloseable {
Class.forName(chConfig.getDriver());
this.reconnect();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ LOG.error("ClickHouseService start failure!", e);
}
- // timer
- long currentTime = System.currentTimeMillis();
- // batch output interval
- timerService.scheduleWithFixedDelay(() -> needBatchOutput.compareAndSet(false, true),
- currentTime + chConfig.getBatchIntervalMs(),
- chConfig.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
- // batch output process
- timerService.scheduleWithFixedDelay(() -> processOutput(),
- currentTime + chConfig.getProcessIntervalMs(),
+ // start timer
+ timerService.scheduleWithFixedDelay(this::processOutput,
+ chConfig.getProcessIntervalMs(),
chConfig.getProcessIntervalMs(), TimeUnit.MILLISECONDS);
}
@@ -95,14 +90,15 @@ public class ClickHouseService implements InsertData, AutoCloseable {
* processOutput
*/
private void processOutput() {
- if (!this.needBatchOutput.get()) {
+ if (!this.needBatchOutput.get()
+ && (System.currentTimeMillis() - lastCheckTime.get() < chConfig.getBatchIntervalMs())) {
return;
}
// output
try (PreparedStatement pstat = this.conn.prepareStatement(INSERT_SQL)) {
- // insert data
- ClickHouseDataPo data = this.batchQueue.poll();
int counter = 0;
+ // output data to clickhouse
+ ClickHouseDataPo data = this.batchQueue.poll();
while (data != null) {
pstat.setString(1, data.getIp());
pstat.setString(2, data.getDockerId());
@@ -124,20 +120,23 @@ public class ClickHouseService implements InsertData, AutoCloseable {
this.conn.commit();
counter = 0;
}
+ data = this.batchQueue.poll();
}
- this.batchCounter.set(0);
- pstat.executeBatch();
- this.conn.commit();
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ if (counter > 0) {
+ pstat.executeBatch();
+ this.conn.commit();
+ }
+ } catch (Exception e1) {
+ LOG.error("Execute output to clickhouse failure!", e1);
+ // re-connect clickhouse
try {
this.reconnect();
- } catch (SQLException e1) {
- LOG.error(e1.getMessage(), e1);
+ } catch (SQLException e2) {
+ LOG.error("Re-connect clickhouse failure!", e2);
}
}
-
- // recover
+ // recover flag
+ lastCheckTime.set(System.currentTimeMillis());
this.needBatchOutput.compareAndSet(true, false);
}