You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/11/24 07:38:11 UTC
[skywalking] 01/01: Enhance cache mechanism in the metric persistent process
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch metric-cache
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit dcd30ae61745299a89480a1a1edadd5efba3506f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Nov 24 15:37:59 2022 +0800
Enhance cache mechanism in the metric persistent process
---
docs/en/changes/changes.md | 6 ++
.../analysis/worker/MetricsPersistentWorker.java | 66 +++++++++++++++-------
.../oap/server/core/storage/IMetricsDAO.java | 5 +-
.../server/core/storage/SessionCacheCallback.java | 50 ++++++++++++++++
.../client/elasticsearch/IndexRequestWrapper.java | 13 ++++-
.../client/elasticsearch/UpdateRequestWrapper.java | 14 ++++-
.../library/client/request/InsertRequest.java | 1 +
.../library/client/request/UpdateRequest.java | 1 +
.../storage/plugin/banyandb/BanyanDBBatchDAO.java | 23 ++++++--
.../measure/BanyanDBMeasureInsertRequest.java | 7 +++
.../measure/BanyanDBMeasureUpdateRequest.java | 7 +++
.../banyandb/measure/BanyanDBMetricsDAO.java | 9 +--
.../stream/BanyanDBStreamInsertRequest.java | 5 ++
.../elasticsearch/base/BatchProcessEsDAO.java | 16 +++++-
.../base/MetricIndexRequestWrapper.java} | 35 ++++++------
.../base/MetricIndexUpdateWrapper.java} | 35 ++++++------
.../plugin/elasticsearch/base/MetricsEsDAO.java | 9 +--
.../storage/plugin/jdbc/BatchSQLExecutor.java | 45 ++++++++++++---
.../server/storage/plugin/jdbc/SQLExecutor.java | 36 +++++++-----
.../plugin/jdbc/common/dao/JDBCManagementDAO.java | 2 +-
.../plugin/jdbc/common/dao/JDBCMetricsDAO.java | 9 +--
.../plugin/jdbc/common/dao/JDBCNoneStreamDAO.java | 2 +-
.../plugin/jdbc/common/dao/JDBCRecordDAO.java | 2 +-
.../plugin/jdbc/common/dao/JDBCSQLExecutor.java | 28 +++++----
.../common/dao/JDBCUITemplateManagementDAO.java | 4 +-
.../shardingsphere/ShardingIntegrationTest.java | 2 +-
26 files changed, 318 insertions(+), 114 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index e865f06135..e4f4e855c9 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -119,6 +119,12 @@
* Support dynamic config the sampling strategy in network profiling.
* Zipkin module support BanyanDB storage.
* Zipkin traces query API, sort the result set by start time by default.
+* Enhance cache mechanism in the metric persistent process.
+ * This cache only worked when the metric is accessible(readable) from the database. Once the insert execution delayed
+ due to the scale, the cache would lose efficacy. It only works for the last time update per minute, considering our
+ 25s period.
+ * Fix ID conflicts for all JDBC storage implementation. Due to insert delay, the JDBC storage implementation would
+ still generate another new insert statement.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index 4725f749e2..f4b755e8ed 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -20,27 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
-import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
+import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
@@ -58,7 +60,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
private final Model model;
- private final Map<Metrics, Metrics> context;
+ /**
+ * The session cache holds the latest metrics in-memory.
+ * There are two ways to make sure metrics in-cache,
+ * 1. Metrics is read from the Database through {@link #loadFromStorage(List)}
+ * 2. The built {@link InsertRequest} executed successfully.
+ */
+ private final Map<Metrics, Metrics> sessionCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
@@ -89,7 +97,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
- this.context = new HashMap<>(100);
+ // Due to the cache would be updated depending on final storage implementation,
+ // the map/cache could be updated concurrently.
+ // Set to ConcurrentHashMap in order to avoid HashMap deadlock.
+ // Since 9.4.0
+ this.sessionCache = new ConcurrentHashMap<>(100);
this.enableDatabaseSession = enableDatabaseSession;
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
@@ -192,12 +204,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
metricsList.add(data);
if (metricsList.size() == batchSize) {
- flushDataToStorage(metricsList, prepareRequests);
+ prepareFlushDataToStorage(metricsList, prepareRequests);
}
}
if (metricsList.size() > 0) {
- flushDataToStorage(metricsList, prepareRequests);
+ prepareFlushDataToStorage(metricsList, prepareRequests);
}
if (prepareRequests.size() > 0) {
@@ -209,14 +221,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
return prepareRequests;
}
- private void flushDataToStorage(List<Metrics> metricsList,
- List<PrepareRequest> prepareRequests) {
+ /**
+ * Build given prepareRequests to prepare database flush
+ *
+ * @param metricsList the metrics in the last read from the in-memory aggregated cache.
+ * @param prepareRequests the results for final execution.
+ */
+ private void prepareFlushDataToStorage(List<Metrics> metricsList,
+ List<PrepareRequest> prepareRequests) {
try {
loadFromStorage(metricsList);
long timestamp = System.currentTimeMillis();
for (Metrics metrics : metricsList) {
- Metrics cachedMetrics = context.get(metrics);
+ Metrics cachedMetrics = sessionCache.get(metrics);
if (cachedMetrics != null) {
/*
* If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate,
@@ -233,12 +251,22 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
continue;
}
cachedMetrics.calculate();
- prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
+ prepareRequests.add(
+ metricsDAO.prepareBatchUpdate(
+ model,
+ cachedMetrics,
+ new SessionCacheCallback(sessionCache, cachedMetrics)
+ ));
nextWorker(cachedMetrics);
cachedMetrics.setLastUpdateTimestamp(timestamp);
} else {
metrics.calculate();
- prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
+ prepareRequests.add(
+ metricsDAO.prepareBatchInsert(
+ model,
+ metrics,
+ new SessionCacheCallback(sessionCache, metrics)
+ ));
nextWorker(metrics);
metrics.setLastUpdateTimestamp(timestamp);
}
@@ -271,7 +299,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
List<Metrics> notInCacheMetrics =
metrics.stream()
.filter(m -> {
- final Metrics cachedValue = context.get(m);
+ final Metrics cachedValue = sessionCache.get(m);
// Not cached or session disabled, the metric could be tagged `not in cache`.
if (cachedValue == null || !enableDatabaseSession) {
return true;
@@ -286,7 +314,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) {
// The expired metrics should be removed from the context and tagged `not in cache` directly.
- context.remove(m);
+ sessionCache.remove(m);
return true;
}
}
@@ -301,9 +329,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics);
if (!enableDatabaseSession) {
// Clear the cache only after results from DB are returned successfully.
- context.clear();
+ sessionCache.clear();
}
- dbMetrics.forEach(m -> context.put(m, m));
+ dbMetrics.forEach(m -> sessionCache.put(m, m));
} catch (final Exception e) {
log.error("Failed to load metrics for merging", e);
}
@@ -312,7 +340,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public void endOfRound() {
if (enableDatabaseSession) {
- Iterator<Metrics> iterator = context.values().iterator();
+ Iterator<Metrics> iterator = sessionCache.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 4320a83cd9..1baee65e4c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -47,7 +47,7 @@ public interface IMetricsDAO extends DAO {
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
- InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
+ InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Transfer the given metrics to an executable update statement.
@@ -55,7 +55,7 @@ public interface IMetricsDAO extends DAO {
* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
- UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
+ UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Calculate the expired status of the metric by given current timestamp, metric and TTL.
@@ -72,4 +72,5 @@ public interface IMetricsDAO extends DAO {
// If the cached metric is older than the TTL indicated.
return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
}
+
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
new file mode 100644
index 0000000000..923da32f72
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/SessionCacheCallback.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+/**
+ * SessionCacheCallback provides a bridge for storage implementations
+ */
+@RequiredArgsConstructor
+public class SessionCacheCallback {
+ private final Map<Metrics, Metrics> sessionCache;
+ private final Metrics metrics;
+ /**
+ * In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs.
+ * This flag would make sure, once one of the generated executions is failure, the whole metric would be removed
+ * from the cache, and would not be added back. As those are executed in a batch mode. The sequence is uncertain.
+ */
+ private volatile boolean isFailed = false;
+
+ public void onInsertCompleted() {
+ if (isFailed) {
+ return;
+ }
+ sessionCache.put(metrics, metrics);
+ }
+
+ public void onUpdateFailure() {
+ sessionCache.remove(metrics);
+ isFailed = true;
+ }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
index 3e2cbf7ad0..e6bd63e28f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@Getter
public class IndexRequestWrapper implements InsertRequest {
- private final IndexRequest request;
+ protected IndexRequest request;
public IndexRequestWrapper(String index, String type, String id,
Map<String, ?> source) {
@@ -35,4 +35,15 @@ public class IndexRequestWrapper implements InsertRequest {
.doc(source)
.build();
}
+
+ /**
+ * Expose an empty constructor to lazy initialization.
+ */
+ protected IndexRequestWrapper() {
+
+ }
+
+ @Override
+ public void onInsertCompleted() {
+ }
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
index 3241aa84e4..dbd9c8c1d2 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/UpdateRequestWrapper.java
@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
@Getter
public class UpdateRequestWrapper implements UpdateRequest {
- private final org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
+ protected org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
public UpdateRequestWrapper(String index, String type, String id,
Map<String, Object> source) {
@@ -34,4 +34,16 @@ public class UpdateRequestWrapper implements UpdateRequest {
.doc(source)
.build();
}
+
+ /**
+ * Expose an empty constructor to lazy initialization.
+ */
+ protected UpdateRequestWrapper() {
+
+ }
+
+ @Override
+ public void onUpdateFailure() {
+
+ }
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
index a9dbdab1af..91ded6a487 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/InsertRequest.java
@@ -18,4 +18,5 @@
package org.apache.skywalking.oap.server.library.client.request;
public interface InsertRequest extends PrepareRequest {
+ void onInsertCompleted();
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
index ea914913c9..2895216971 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/request/UpdateRequest.java
@@ -18,4 +18,5 @@
package org.apache.skywalking.oap.server.library.client.request;
public interface UpdateRequest extends PrepareRequest {
+ void onUpdateFailure();
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index 9564de27da..b3cd0461e0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -18,6 +18,9 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
@@ -29,10 +32,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-
public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
private static final Object STREAM_SYNCHRONIZER = new Object();
private static final Object MEASURE_SYNCHRONIZER = new Object();
@@ -69,9 +68,21 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
if (r instanceof BanyanDBStreamInsertRequest) {
return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
} else if (r instanceof BanyanDBMeasureInsertRequest) {
- return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
+ return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite())
+ .whenComplete((v, throwable) -> {
+ if (throwable == null) {
+ // Insert completed
+ ((BanyanDBMeasureInsertRequest) r).onInsertCompleted();
+ }
+ });
} else if (r instanceof BanyanDBMeasureUpdateRequest) {
- return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
+ return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite())
+ .whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ // Update failure
+ ((BanyanDBMeasureUpdateRequest) r).onUpdateFailure();
+ }
+ });
}
return CompletableFuture.completedFuture(null);
}).toArray(CompletableFuture[]::new));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
index 3f965bff6f..bbfe883372 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@RequiredArgsConstructor
@Getter
public class BanyanDBMeasureInsertRequest implements InsertRequest {
private final MeasureWrite measureWrite;
+ private final SessionCacheCallback callback;
+
+ @Override
+ public void onInsertCompleted() {
+ callback.onInsertCompleted();
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
index 2ff1fae015..b39190aada 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
@RequiredArgsConstructor
@Getter
public class BanyanDBMeasureUpdateRequest implements UpdateRequest {
private final MeasureWrite measureWrite;
+ private final SessionCacheCallback callback;
+
+ @Override
+ public void onUpdateFailure() {
+ callback.onUpdateFailure();
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 0b24eb0e29..0ca07e8cfe 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@@ -77,7 +78,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
}
@Override
- public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+ public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
log.info("prepare to insert {}", model.getName());
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
if (schema == null) {
@@ -89,11 +90,11 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
toStorage.acceptID(metrics.id());
- return new BanyanDBMeasureInsertRequest(toStorage.obtain());
+ return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
}
@Override
- public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+ public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
log.info("prepare to update {}", model.getName());
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
if (schema == null) {
@@ -105,6 +106,6 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
toStorage.acceptID(metrics.id());
- return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
+ return new BanyanDBMeasureUpdateRequest(toStorage.obtain(), callback);
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
index b04e4422d5..15dbc52c53 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -27,4 +27,9 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@Getter
public class BanyanDBStreamInsertRequest implements InsertRequest {
private final StreamWrite streamWrite;
+
+ @Override
+ public void onInsertCompleted() {
+
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index dc6c47bb19..76e44d67fd 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -75,9 +75,21 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
if (prepareRequest instanceof InsertRequest) {
- return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
+ return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest())
+ .whenComplete((v, throwable) -> {
+ if (throwable == null) {
+ // Insert completed
+ ((IndexRequestWrapper) prepareRequest).onInsertCompleted();
+ }
+ });
} else {
- return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
+ return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest())
+ .whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ // Update failure
+ ((UpdateRequestWrapper) prepareRequest).onUpdateFailure();
+ }
+ });
}
}).toArray(CompletableFuture[]::new));
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
index 3e2cbf7ad0..ce5cc6ec91 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexRequestWrapper.java
@@ -13,26 +13,29 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper;
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexRequestWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexRequestWrapper extends IndexRequestWrapper {
+ private final SessionCacheCallback callback;
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
- private final IndexRequest request;
+ public MetricIndexRequestWrapper(IndexRequestWrapper requestWrapper, SessionCacheCallback callback) {
+ this.request = requestWrapper.getRequest();
+ this.callback = callback;
+ }
- public IndexRequestWrapper(String index, String type, String id,
- Map<String, ?> source) {
- request = IndexRequest.builder()
- .index(index)
- .type(type)
- .id(id)
- .doc(source)
- .build();
+ @Override
+ public void onInsertCompleted() {
+ if (callback != null) {
+ callback.onInsertCompleted();
+ }
}
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
similarity index 50%
copy from oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
index 3e2cbf7ad0..34216534b9 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/IndexRequestWrapper.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricIndexUpdateWrapper.java
@@ -13,26 +13,29 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
*/
-package org.apache.skywalking.oap.server.library.client.elasticsearch;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.UpdateRequestWrapper;
-import java.util.Map;
-import lombok.Getter;
-import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+/**
+ * MetricIndexUpdateWrapper wraps the built request wrapper with a new callback.
+ */
+public class MetricIndexUpdateWrapper extends UpdateRequestWrapper {
+ private final SessionCacheCallback callback;
-@Getter
-public class IndexRequestWrapper implements InsertRequest {
- private final IndexRequest request;
+ public MetricIndexUpdateWrapper(UpdateRequestWrapper requestWrapper, SessionCacheCallback callback) {
+ this.request = requestWrapper.getRequest();
+ this.callback = callback;
+ }
- public IndexRequestWrapper(String index, String type, String id,
- Map<String, ?> source) {
- request = IndexRequest.builder()
- .index(index)
- .type(type)
- .id(id)
- .doc(source)
- .build();
+ @Override
+ public void onUpdateFailure() {
+ if (callback != null) {
+ callback.onUpdateFailure();
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 0363669475..12319dae1a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -97,24 +98,24 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
}
@Override
- public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
+ public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) {
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
- return getClient().prepareInsert(modelName, id, builder);
+ return new MetricIndexRequestWrapper(getClient().prepareInsert(modelName, id, builder), callback);
}
@Override
- public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
+ public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) {
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder =
IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
- return getClient().prepareUpdate(modelName, id, builder);
+ return new MetricIndexUpdateWrapper(getClient().prepareUpdate(modelName, id, builder), callback);
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
index a3d4848463..4061ae7405 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
@@ -18,15 +18,17 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
/**
* A Batch SQL executor.
@@ -45,32 +47,59 @@ public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
return;
}
String sql = prepareRequests.get(0).toString();
+ List<PrepareRequest> bulkRequest = new ArrayList<>(maxBatchSqlSize);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
int pendingCount = 0;
for (int k = 0; k < prepareRequests.size(); k++) {
SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
sqlExecutor.setParameters(preparedStatement);
preparedStatement.addBatch();
+ bulkRequest.add(sqlExecutor);
if (k > 0 && k % maxBatchSqlSize == 0) {
- executeBatch(preparedStatement, maxBatchSqlSize, sql);
+ executeBatch(preparedStatement, maxBatchSqlSize, sql, bulkRequest);
+ bulkRequest.clear();
pendingCount = 0;
} else {
pendingCount++;
}
}
if (pendingCount > 0) {
- executeBatch(preparedStatement, pendingCount, sql);
+ executeBatch(preparedStatement, pendingCount, sql, bulkRequest);
+ bulkRequest.clear();
}
}
}
- private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+ private void executeBatch(PreparedStatement preparedStatement,
+ int pendingCount,
+ String sql,
+ List<PrepareRequest> bulkRequest) throws SQLException {
long start = System.currentTimeMillis();
- preparedStatement.executeBatch();
+ final int[] executeBatchResults = preparedStatement.executeBatch();
+ boolean isInsert = bulkRequest.get(0) instanceof InsertRequest;
+ for (int i = 0; i < executeBatchResults.length; i++) {
+ if (executeBatchResults[i] == 1 && isInsert) {
+ // Insert successfully.
+ ((InsertRequest) bulkRequest.get(i)).onInsertCompleted();
+ } else if (executeBatchResults[i] == 0 && !isInsert) {
+ // Update Failure.
+ ((UpdateRequest) bulkRequest.get(i)).onUpdateFailure();
+ }
+ }
if (log.isDebugEnabled()) {
long end = System.currentTimeMillis();
long cost = end - start;
log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
}
}
+
+ @Override
+ public void onInsertCompleted() {
+ throw new UnexpectedException("BatchSQLExecutor.onInsertCompleted should not be called");
+ }
+
+ @Override
+ public void onUpdateFailure() {
+ throw new UnexpectedException("BatchSQLExecutor.onUpdateFailure should not be called");
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
index 26c19ee276..266350eb45 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
@@ -25,34 +25,30 @@ import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A SQL executor.
*/
@EqualsAndHashCode(of = "sql")
+@RequiredArgsConstructor
+@Slf4j
public class SQLExecutor implements InsertRequest, UpdateRequest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
-
- private String sql;
- private List<Object> param;
+ private final String sql;
+ private final List<Object> param;
+ private final SessionCacheCallback callback;
@Getter
private List<SQLExecutor> additionalSQLs;
- public SQLExecutor(String sql, List<Object> param) {
- this.sql = sql;
- this.param = param;
- }
-
public void invoke(Connection connection) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
setParameters(preparedStatement);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
+ if (log.isDebugEnabled()) {
+ log.debug("execute sql in batch: {}, parameters: {}", sql, param);
}
preparedStatement.execute();
if (additionalSQLs != null) {
@@ -79,4 +75,16 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
}
additionalSQLs.addAll(sqlExecutors);
}
+
+ @Override
+ public void onInsertCompleted() {
+ if (callback != null)
+ callback.onInsertCompleted();
+ }
+
+ @Override
+ public void onUpdateFailure() {
+ if (callback != null)
+ callback.onUpdateFailure();
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
index 50becff534..2aea8f6ef3 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCManagementDAO.java
@@ -48,7 +48,7 @@ public class JDBCManagementDAO extends JDBCSQLExecutor implements IManagementDAO
}
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder,
- new HashMapConverter.ToStorage());
+ new HashMapConverter.ToStorage(), null);
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
index 3294d80720..5170362df7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetricsDAO.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
@@ -49,12 +50,12 @@ public class JDBCMetricsDAO extends JDBCSQLExecutor implements IMetricsDAO {
}
@Override
- public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
- return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage());
+ public SQLExecutor prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+ return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage(), callback);
}
@Override
- public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
- return getUpdateExecutor(model.getName(), metrics, storageBuilder);
+ public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
+ return getUpdateExecutor(model.getName(), metrics, storageBuilder, callback);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
index aaf903b76b..7a87059e08 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCNoneStreamDAO.java
@@ -41,7 +41,7 @@ public class JDBCNoneStreamDAO extends JDBCSQLExecutor implements INoneStreamDAO
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
try (Connection connection = jdbcClient.getConnection()) {
- SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage());
+ SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage(), null);
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
index 1ca36a69fe..f382f746e1 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCRecordDAO.java
@@ -35,6 +35,6 @@ public class JDBCRecordDAO extends JDBCSQLExecutor implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage());
+ return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage(), null);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
index 12321e3629..c15f4768ae 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSQLExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
@@ -116,7 +117,8 @@ public class JDBCSQLExecutor {
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder,
- Convert2Storage<Map<String, Object>> converter) throws IOException {
+ Convert2Storage<Map<String, Object>> converter,
+ SessionCacheCallback callback) throws IOException {
Model model = TableMetaInfo.get(modelName);
storageBuilder.entity2Storage(metrics, converter);
Map<String, Object> objectMap = converter.obtain();
@@ -126,7 +128,7 @@ public class JDBCSQLExecutor {
mainEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
});
SQLExecutor sqlExecutor = buildInsertExecutor(
- modelName, model.getColumns(), metrics, mainEntity);
+ modelName, model.getColumns(), metrics, mainEntity, callback);
//build additional table sql
for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
.getAdditionalTables()
@@ -137,7 +139,7 @@ public class JDBCSQLExecutor {
});
List<SQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
- additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity
+ additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity, callback
);
sqlExecutor.appendAdditionalSQLs(additionalSQLExecutors);
}
@@ -147,7 +149,8 @@ public class JDBCSQLExecutor {
private <T extends StorageData> SQLExecutor buildInsertExecutor(String tableName,
List<ModelColumn> columns,
T metrics,
- Map<String, Object> objectMap) throws IOException {
+ Map<String, Object> objectMap,
+ SessionCacheCallback onCompleteCallback) throws IOException {
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
List<Object> param = new ArrayList<>();
sqlBuilder.append("(?,");
@@ -169,13 +172,14 @@ public class JDBCSQLExecutor {
}
sqlBuilder.append(")");
- return new SQLExecutor(sqlBuilder.toString(), param);
+ return new SQLExecutor(sqlBuilder.toString(), param, onCompleteCallback);
}
private <T extends StorageData> List<SQLExecutor> buildAdditionalInsertExecutor(String tableName,
List<ModelColumn> columns,
T metrics,
- Map<String, Object> objectMap) throws IOException {
+ Map<String, Object> objectMap,
+ SessionCacheCallback callback) throws IOException {
List<SQLExecutor> sqlExecutors = new ArrayList<>();
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
@@ -211,17 +215,18 @@ public class JDBCSQLExecutor {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position, object);
- sqlExecutors.add(new SQLExecutor(sql, paramCopy));
+ sqlExecutors.add(new SQLExecutor(sql, paramCopy, callback));
}
} else {
- sqlExecutors.add(new SQLExecutor(sql, param));
+ sqlExecutors.add(new SQLExecutor(sql, param, callback));
}
return sqlExecutors;
}
protected <T extends StorageData> SQLExecutor getUpdateExecutor(String modelName, T metrics,
- StorageBuilder<T> storageBuilder) throws IOException {
+ StorageBuilder<T> storageBuilder,
+ SessionCacheCallback callback) throws IOException {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> objectMap = toStorage.obtain();
@@ -236,7 +241,8 @@ public class JDBCSQLExecutor {
if (model.getSqlDBModelExtension().isShardingTable()) {
SQLDatabaseModelExtension.Sharding sharding = model.getSqlDBModelExtension().getSharding().orElseThrow(
() -> new UnexpectedException("Sharding should not be empty."));
- if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(sharding.getTableShardingColumn())) {
+ if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(
+ sharding.getTableShardingColumn())) {
continue;
}
}
@@ -253,6 +259,6 @@ public class JDBCSQLExecutor {
sqlBuilder.append(" WHERE id = ?");
param.add(metrics.id());
- return new SQLExecutor(sqlBuilder.toString(), param);
+ return new SQLExecutor(sqlBuilder.toString(), param, callback);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
index 7097aad725..9267efd21c 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCUITemplateManagementDAO.java
@@ -102,7 +102,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException {
final UITemplate uiTemplate = setting.toEntity();
final SQLExecutor insertExecutor = getInsertExecutor(
- UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage());
+ UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage(), null);
try (Connection connection = h2Client.getConnection()) {
insertExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
@@ -135,7 +135,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException {
final SQLExecutor updateExecutor = getUpdateExecutor(
- UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
+ UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), null);
try (Connection connection = h2Client.getConnection()) {
updateExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
diff --git a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
index c7ac504256..c9776758ec 100644
--- a/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
+++ b/oap-server/server-storage-plugin/storage-shardingsphere-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/shardingsphere/ShardingIntegrationTest.java
@@ -642,7 +642,7 @@ public class ShardingIntegrationTest {
.builder()
.getDeclaredConstructor()
.newInstance());
- jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn);
+ jdbcMetricsDAO.prepareBatchInsert(model, metrics, null).invoke(conn);
}
}
}