You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/07/20 14:59:57 UTC
[skywalking] branch master updated: Improve OAP server performance.
(#3127)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 6338de3 Improve OAP server performance. (#3127)
6338de3 is described below
commit 6338de392fe988e4bf46e8a6c5aadb92b946ddab
Author: 彭勇升 pengys <pe...@apache.org>
AuthorDate: Sat Jul 20 22:59:52 2019 +0800
Improve OAP server performance. (#3127)
* Improve OAP server performance.
---
.../analysis/worker/MetricsPersistentWorker.java | 76 +++++++++++------
.../analysis/worker/MetricsStreamProcessor.java | 8 +-
.../analysis/worker/RecordStreamProcessor.java | 4 +-
.../core/analysis/worker/TopNStreamProcessor.java | 2 +-
.../register/worker/InventoryStreamProcessor.java | 2 +-
.../oap/server/core/storage/IMetricsDAO.java | 3 +-
.../oap/server/core/storage/PersistenceTimer.java | 98 +++++++++++++++-------
.../oap/server/core/storage/StorageDAO.java | 4 +-
.../server/core/storage/model/IModelSetter.java | 2 +-
.../oap/server/core/storage/model/Model.java | 4 +-
.../server/core/storage/model/StorageModels.java | 4 +-
.../client/elasticsearch/ElasticSearchClient.java | 28 ++-----
.../elasticsearch/ITElasticSearchClient.java | 31 +++----
.../src/main/assembly/application.yml | 3 +-
.../src/main/resources/application.yml | 3 +-
.../StorageModuleElasticsearchConfig.java | 1 -
.../StorageModuleElasticsearchProvider.java | 2 +-
.../elasticsearch/base/BatchProcessEsDAO.java | 6 +-
.../plugin/elasticsearch/base/MetricsEsDAO.java | 20 +++--
.../plugin/elasticsearch/base/StorageEsDAO.java | 6 +-
.../elasticsearch/base/StorageEsInstaller.java | 7 +-
.../elasticsearch/query/MetricsQueryEsDAO.java | 22 +++--
.../storage/plugin/jdbc/h2/dao/H2MetricsDAO.java | 7 +-
.../storage/plugin/jdbc/h2/dao/H2SQLExecutor.java | 31 ++-----
.../storage/plugin/jdbc/h2/dao/H2StorageDAO.java | 12 ++-
25 files changed, 211 insertions(+), 175 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
index d9b27f8..4e819b4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java
@@ -31,8 +31,6 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
-import static java.util.Objects.nonNull;
-
/**
* @author peng-yongsheng
*/
@@ -42,7 +40,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private final Model model;
private final MergeDataCache<Metrics> mergeDataCache;
- private final IMetricsDAO metricsDAO;
+ private final IMetricsDAO<?, ?> metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
@@ -99,40 +97,64 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
+ long start = System.currentTimeMillis();
+
List<Object> batchCollection = new LinkedList<>();
- cache.getLast().collection().forEach(data -> {
+
+ Collection<Metrics> collection = cache.getLast().collection();
+
+ int i = 0;
+ Metrics[] metrics = null;
+ for (Metrics data : collection) {
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
- Metrics dbData = null;
- try {
- dbData = metricsDAO.get(model, data);
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- try {
- if (nonNull(dbData)) {
- data.combine(dbData);
- data.calculate();
-
- batchCollection.add(metricsDAO.prepareBatchUpdate(model, data));
+ int batchGetSize = 2000;
+ int mod = i % batchGetSize;
+ if (mod == 0) {
+ int residual = collection.size() - i;
+ if (residual >= batchGetSize) {
+ metrics = new Metrics[batchGetSize];
} else {
- batchCollection.add(metricsDAO.prepareBatchInsert(model, data));
+ metrics = new Metrics[residual];
}
-
- if (Objects.nonNull(nextAlarmWorker)) {
- nextAlarmWorker.in(data);
- }
- if (Objects.nonNull(nextExportWorker)) {
- ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL);
- nextExportWorker.in(event);
+ }
+ metrics[mod] = data;
+
+ if (mod == metrics.length - 1) {
+ try {
+ Map<String, Metrics> dbMetricsMap = metricsDAO.get(model, metrics);
+
+ for (Metrics metric : metrics) {
+ if (dbMetricsMap.containsKey(metric.id())) {
+ metric.combine(dbMetricsMap.get(metric.id()));
+ metric.calculate();
+ batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
+ } else {
+ batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
+ }
+
+ if (Objects.nonNull(nextAlarmWorker)) {
+ nextAlarmWorker.in(metric);
+ }
+ if (Objects.nonNull(nextExportWorker)) {
+ ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
+ nextExportWorker.in(event);
+ }
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
}
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
}
- });
+
+ i++;
+ }
+
+ if (batchCollection.size() > 0) {
+ logger.debug("prepareBatch model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
+ }
return batchCollection;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 20d09bf..dfd4b20 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -75,19 +75,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null;
if (configService.shouldToHour()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour));
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToDay()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day));
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
if (configService.shouldToMonth()) {
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month));
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
}
- Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
+ Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 6483a11..b84d99b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -65,8 +65,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
- RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO);
+ Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 03ae693..fcb3a68 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -61,7 +61,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second));
+ Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO);
persistentWorkers.add(persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
index 596ab81..4f9f3f7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java
@@ -56,7 +56,7 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
- Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
+ Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
StreamDataMappingSetter streamDataMappingSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingSetter.class);
streamDataMappingSetter.putIfAbsent(inventoryClass);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
index 29ec617..5c2e246 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IMetricsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -27,7 +28,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
*/
public interface IMetricsDAO<INSERT, UPDATE> extends DAO {
- Metrics get(Model model, Metrics metrics) throws IOException;
+ Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException;
INSERT prepareBatchInsert(Model model, Metrics metrics) throws IOException;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 08cced4..ffe3480 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -61,17 +61,20 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
- new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
- t -> logger.error("Extract data and save failure.", t)), 1, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+ new RunnableWithExceptionProtection(() -> extractDataAndSaveRecord(batchDAO),
+ t -> logger.error("Extract data and save record failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
+
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(() -> extractDataAndSaveMetrics(batchDAO),
+ t -> logger.error("Extract data and save metrics failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
this.isStarted = true;
}
}
- @SuppressWarnings("unchecked")
- private void extractDataAndSave(IBatchDAO batchDAO) {
+ private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
- logger.debug("Extract data and save");
+ logger.debug("Extract data and save record");
}
long startTime = System.currentTimeMillis();
@@ -79,36 +82,12 @@ public enum PersistenceTimer {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List records = new LinkedList();
- List metrics = new LinkedList();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
- persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
- persistenceWorkers.forEach(worker -> {
- if (logger.isDebugEnabled()) {
- logger.debug("extract {} worker data and save", worker.getClass().getName());
- }
-
- if (worker.flushAndSwitch()) {
- List<?> batchCollection = worker.buildBatchCollection();
-
- if (logger.isDebugEnabled()) {
- logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
- }
-
- if (worker instanceof RecordPersistentWorker) {
- records.addAll(batchCollection);
- } else if (worker instanceof MetricsPersistentWorker) {
- metrics.addAll(batchCollection);
- } else if (worker instanceof TopNWorker) {
- records.addAll(batchCollection);
- } else {
- logger.error("Missing the worker {}", worker.getClass().getSimpleName());
- }
- }
- });
+ buildBatchCollection(persistenceWorkers, records);
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
@@ -122,6 +101,46 @@ public enum PersistenceTimer {
if (CollectionUtils.isNotEmpty(records)) {
batchDAO.asynchronous(records);
}
+ } finally {
+ executeLatencyTimer.finish();
+ }
+ } catch (Throwable e) {
+ errorCounter.inc();
+ logger.error(e.getMessage(), e);
+ } finally {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Persistence data save finish");
+ }
+ }
+
+ if (debug) {
+ logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ }
+ }
+
+ private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Extract data and save metrics");
+ }
+
+ long startTime = System.currentTimeMillis();
+ try {
+ HistogramMetrics.Timer timer = prepareLatency.createTimer();
+
+ List metrics = new LinkedList();
+ try {
+ List<PersistenceWorker> persistenceWorkers = new ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+ buildBatchCollection(persistenceWorkers, metrics);
+
+ if (debug) {
+ logger.info("build metrics batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+ }
+ } finally {
+ timer.finish();
+ }
+
+ HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
+ try {
if (CollectionUtils.isNotEmpty(metrics)) {
batchDAO.synchronous(metrics);
}
@@ -141,4 +160,23 @@ public enum PersistenceTimer {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
+
+ @SuppressWarnings("unchecked")
+ private void buildBatchCollection(List<PersistenceWorker> persistenceWorkers, List collection) {
+ persistenceWorkers.forEach(worker -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data and save", worker.getClass().getName());
+ }
+
+ if (worker.flushAndSwitch()) {
+ List<?> batchCollection = worker.buildBatchCollection();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+ }
+
+ collection.addAll(batchCollection);
+ }
+ });
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 4d3f26d..35d178d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -26,9 +26,9 @@ import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
-public interface StorageDAO extends Service {
+public interface StorageDAO<INSERT, UPDATE> extends Service {
- IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder);
+ IMetricsDAO<INSERT, UPDATE> newMetricsDao(StorageBuilder<Metrics> storageBuilder);
IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
index a471212..b30c457 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java
@@ -26,5 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface IModelSetter extends Service {
- Model putIfAbsent(Class aClass, int scopeId, Storage storage);
+ Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index b2dccc9..9693ef7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -34,13 +34,15 @@ public class Model {
private final boolean deleteHistory;
private final List<ModelColumn> columns;
private final int scopeId;
+ private final boolean record;
- public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling) {
+ public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory, int scopeId, Downsampling downsampling, boolean record) {
this.columns = columns;
this.capableOfTimeSeries = capableOfTimeSeries;
this.downsampling = downsampling;
this.deleteHistory = deleteHistory;
this.scopeId = scopeId;
this.name = ModelName.build(downsampling, name);
+ this.record = record;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2642523..3ccf594 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -37,7 +37,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
this.models = new LinkedList<>();
}
- @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) {
+ @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record) {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
@@ -50,7 +50,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, storage.getModelName(), modelColumns);
- Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling());
+ Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling(), record);
models.add(model);
return model;
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index a4a4ec6..cbee9e2 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -40,10 +40,9 @@ import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
-import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
@@ -227,29 +226,13 @@ public class ElasticSearchClient implements Client {
return client.get(request);
}
- public SearchResponse idQuery(String indexName, String id) throws IOException {
- indexName = formatIndexName(indexName);
-
- SearchRequest searchRequest = new SearchRequest(indexName);
- searchRequest.types(TYPE);
- searchRequest.source().query(QueryBuilders.idsQuery().addIds(id));
- return client.search(searchRequest);
- }
-
- public Map<String, Map<String, Object>> ids(String indexName, String... ids) throws IOException {
+ public SearchResponse ids(String indexName, String[] ids) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
- SearchResponse response = client.search(searchRequest);
-
- Map<String, Map<String, Object>> result = new HashMap<>();
- SearchHit[] hits = response.getHits().getHits();
- for (SearchHit hit : hits) {
- result.put(hit.getId(), hit.getSourceAsMap());
- }
- return result;
+ return client.search(searchRequest);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
@@ -312,7 +295,7 @@ public class ElasticSearchClient implements Client {
}
}
- public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
+ public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
@@ -325,7 +308,7 @@ public class ElasticSearchClient implements Client {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
- logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
+ logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size());
}
}
@@ -337,7 +320,6 @@ public class ElasticSearchClient implements Client {
return BulkProcessor.builder(client::bulkAsync, listener)
.setBulkActions(bulkActions)
- .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
index c0c0854..6beba6b 100644
--- a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ITElasticSearchClient.java
@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -27,26 +29,13 @@ import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import org.powermock.reflect.Whitebox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -188,7 +177,7 @@ public class ITElasticSearchClient {
@Test
public void bulk() throws InterruptedException {
- BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
+ BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 10, 2);
Map<String, String> source = new HashMap<>();
source.put("column1", "value1");
@@ -246,7 +235,7 @@ public class ITElasticSearchClient {
}
private RestHighLevelClient getRestHighLevelClient() {
- return (RestHighLevelClient) Whitebox.getInternalState(client, "client");
+ return (RestHighLevelClient)Whitebox.getInternalState(client, "client");
}
private JsonObject undoFormatIndexName(JsonObject index) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index c5b01a3..bc4b745 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,8 +75,7 @@ storage:
# otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
# monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
-# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
-# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 665a852..ec79843 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,8 +75,7 @@ storage:
otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
- bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+ bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index bb5671e..48f4e0c 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -32,7 +32,6 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter private int indexReplicasNumber = 0;
@Setter private int indexRefreshInterval = 2;
@Setter private int bulkActions = 2000;
- @Setter private int bulkSize = 20;
@Setter private int flushInterval = 10;
@Setter private int concurrentRequests = 2;
@Setter private int syncBulkActions = 3;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 58e8383..63d936b 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -68,7 +68,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
}
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword());
- this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
+ this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index a3d52e9..d4d118d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -36,22 +36,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private BulkProcessor bulkProcessor;
private final int bulkActions;
- private final int bulkSize;
private final int flushInterval;
private final int concurrentRequests;
- public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+ public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int flushInterval,
int concurrentRequests) {
super(client);
this.bulkActions = bulkActions;
- this.bulkSize = bulkSize;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
}
@Override public void asynchronous(List<?> collection) {
if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
+ this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
if (logger.isDebugEnabled()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index e313c60..a0c8212 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
+import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -40,13 +41,20 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO<IndexRequest, Upd
this.storageBuilder = storageBuilder;
}
- @Override public Metrics get(Model model, Metrics metrics) throws IOException {
- SearchResponse response = getClient().idQuery(model.getName(), metrics.id());
- if (response.getHits().totalHits > 0) {
- return storageBuilder.map2Data(response.getHits().getAt(0).getSourceAsMap());
- } else {
- return null;
+ @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
+ Map<String, Metrics> result = new HashMap<>();
+
+ String[] ids = new String[metrics.length];
+ for (int i = 0; i < metrics.length; i++) {
+ ids[i] = metrics[i].id();
+ }
+
+ SearchResponse response = getClient().ids(model.getName(), ids);
+ for (int i = 0; i < response.getHits().totalHits; i++) {
+ Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
+ result.put(source.id(), source);
}
+ return result;
}
@Override public IndexRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 52fe3fb..318b9a8 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -23,17 +23,19 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
/**
* @author peng-yongsheng
*/
-public class StorageEsDAO extends EsDAO implements StorageDAO {
+public class StorageEsDAO extends EsDAO implements StorageDAO<IndexRequest, UpdateRequest> {
public StorageEsDAO(ElasticSearchClient client) {
super(client);
}
- @Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
+ @Override public IMetricsDAO<IndexRequest, UpdateRequest> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
return new MetricsEsDAO(getClient(), storageBuilder);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 7883049..0246ced 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -64,7 +64,7 @@ public class StorageEsInstaller extends ModelInstaller {
@Override protected void createTable(Client client, Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
- JsonObject settings = createSetting();
+ JsonObject settings = createSetting(model.isRecord());
JsonObject mapping = createMapping(model);
logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString());
@@ -97,13 +97,12 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
- private JsonObject createSetting() {
+ private JsonObject createSetting(boolean record) {
JsonObject setting = new JsonObject();
setting.addProperty("index.number_of_shards", indexShardsNumber);
setting.addProperty("index.number_of_replicas", indexReplicasNumber);
- setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString());
+ setting.addProperty("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
- TimeValue.timeValueSeconds(3);
return setting;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
index 7df8ba6..dd36e41 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetricsQueryEsDAO.java
@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
@@ -102,15 +103,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
@Override public IntValues getLinearIntValues(String indName, Downsampling downsampling, List<String> ids, String valueCName) throws IOException {
String indexName = ModelName.build(downsampling, indName);
- Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
+ SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
+ Map<String, Map<String, Object>> idMap = toMap(response);
IntValues intValues = new IntValues();
for (String id : ids) {
KVInt kvInt = new KVInt();
kvInt.setId(id);
kvInt.setValue(0);
- if (response.containsKey(id)) {
- Map<String, Object> source = response.get(id);
+ if (idMap.containsKey(id)) {
+ Map<String, Object> source = idMap.get(id);
kvInt.setValue(((Number)source.getOrDefault(valueCName, 0)).longValue());
}
intValues.getValues().add(kvInt);
@@ -125,11 +127,12 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
Thermodynamic thermodynamic = new Thermodynamic();
List<List<Long>> thermodynamicValueMatrix = new ArrayList<>();
- Map<String, Map<String, Object>> response = getClient().ids(indexName, ids.toArray(new String[0]));
+ SearchResponse response = getClient().ids(indexName, ids.toArray(new String[0]));
+ Map<String, Map<String, Object>> idMap = toMap(response);
int numOfSteps = 0;
for (String id : ids) {
- Map<String, Object> source = response.get(id);
+ Map<String, Object> source = idMap.get(id);
if (source == null) {
// add empty list to represent no data exist for this time bucket
thermodynamicValueMatrix.add(new ArrayList<>());
@@ -159,4 +162,13 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
return thermodynamic;
}
+
+ private Map<String, Map<String, Object>> toMap(SearchResponse response) {
+ Map<String, Map<String, Object>> result = new HashMap<>();
+ SearchHit[] hits = response.getHits().getHits();
+ for (SearchHit hit : hits) {
+ result.put(hit.getId(), hit.getSourceAsMap());
+ }
+ return result;
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
index 70318cc..1943c94 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetricsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
+import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -29,6 +30,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
* @author wusheng
*/
public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecutor, SQLExecutor> {
+
private JDBCHikariCPClient h2Client;
private StorageBuilder<Metrics> storageBuilder;
@@ -37,8 +39,9 @@ public class H2MetricsDAO extends H2SQLExecutor implements IMetricsDAO<SQLExecut
this.storageBuilder = storageBuilder;
}
- @Override public Metrics get(Model model, Metrics metrics) throws IOException {
- return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
+ @Override public Map<String, Metrics> get(Model model, Metrics[] metrics) throws IOException {
+ // return (Metrics)getByID(h2Client, model.getName(), metrics.id(), storageBuilder);
+ return null;
}
@Override public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
index 21d3d3c..3ab7d29 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java
@@ -19,26 +19,17 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.sql.*;
+import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
+import org.slf4j.*;
/**
* @author wusheng
@@ -52,9 +43,7 @@ public class H2SQLExecutor {
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE id = ?", id)) {
return toStorageData(rs, modelName, storageBuilder);
}
- } catch (SQLException e) {
- throw new IOException(e.getMessage(), e);
- } catch (JDBCClientException e) {
+ } catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
@@ -65,9 +54,7 @@ public class H2SQLExecutor {
try (ResultSet rs = h2Client.executeQuery(connection, "SELECT * FROM " + modelName + " WHERE " + columnName + " = ?", value)) {
return toStorageData(rs, modelName, storageBuilder);
}
- } catch (SQLException e) {
- throw new IOException(e.getMessage(), e);
- } catch (JDBCClientException e) {
+ } catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
@@ -92,9 +79,7 @@ public class H2SQLExecutor {
return rs.getInt(ServiceInstanceInventory.SEQUENCE);
}
}
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- } catch (JDBCClientException e) {
+ } catch (SQLException | JDBCClientException e) {
logger.error(e.getMessage(), e);
}
return Const.NONE;
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
index d23001d..d8ef839 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java
@@ -21,24 +21,22 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
/**
* @author wusheng
*/
-public class H2StorageDAO implements StorageDAO {
+public class H2StorageDAO implements StorageDAO<SQLExecutor, SQLExecutor> {
+
private JDBCHikariCPClient h2Client;
public H2StorageDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
- @Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
+ @Override public IMetricsDAO<SQLExecutor, SQLExecutor> newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
return new H2MetricsDAO(h2Client, storageBuilder);
}