You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/07/15 10:06:29 UTC
[skywalking] branch master updated: Performance: optimize IDs read
of ElasticSearch storage options(6 and 7) (#7307)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new c59ee90 Performance: optimize IDs read of ElasticSearch storage options(6 and 7) (#7307)
c59ee90 is described below
commit c59ee900299d24d15780c0b1600c823c3ea111eb
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Jul 15 18:06:11 2021 +0800
Performance: optimize IDs read of ElasticSearch storage options(6 and 7) (#7307)
* Performance: optimize IDs read of ElasticSearch storage options(6 and 7). Use the physical index rather than template alias name.
---
.github/workflows/e2e.log.yaml | 1 +
CHANGES.md | 2 +
.../oap/server/core/analysis/MetricsExtension.java | 7 +++
.../analysis/worker/ManagementStreamProcessor.java | 3 +-
.../analysis/worker/MetricsStreamProcessor.java | 8 ++-
.../core/analysis/worker/NoneStreamProcessor.java | 3 +-
.../analysis/worker/RecordStreamProcessor.java | 3 +-
.../core/analysis/worker/TopNStreamProcessor.java | 7 ++-
.../oap/server/core/storage/IHistoryDeleteDAO.java | 16 +++++-
.../server/core/storage/annotation/Storage.java | 1 +
.../oap/server/core/storage/model/Model.java | 5 +-
.../server/core/storage/model/StorageModels.java | 3 +-
.../core/storage/ttl/DataTTLKeeperTimer.java | 36 +++++++++---
.../core/storage/model/StorageModelsTest.java | 2 +-
.../elasticsearch/IndicesMetadataCache.java} | 38 +++++++------
.../elasticsearch/base/HistoryDeleteEsDAO.java | 34 +++++++++++
.../plugin/elasticsearch/base/MetricsEsDAO.java | 65 ++++++++++++++++++----
.../elasticsearch/base/TimeSeriesUtilsTest.java | 6 +-
18 files changed, 191 insertions(+), 49 deletions(-)
diff --git a/.github/workflows/e2e.log.yaml b/.github/workflows/e2e.log.yaml
index 116b15e..c8a4d9a 100644
--- a/.github/workflows/e2e.log.yaml
+++ b/.github/workflows/e2e.log.yaml
@@ -36,6 +36,7 @@ jobs:
if: (github.event_name == 'schedule' && github.repository == 'apache/skywalking') || (github.event_name != 'schedule')
name: Log
runs-on: ubuntu-latest
+ timeout-minutes: 90
strategy:
matrix:
storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb']
diff --git a/CHANGES.md b/CHANGES.md
index e399545..47a03e2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -92,6 +92,8 @@ Release Notes.
more chances including duplicate elements. Don't need this as indicate anymore.
* Reduce the flush period of hour and day level metrics, only run in 4 times of regular persistent period. This means
default flush period of hour and day level metrics are 25s * 4.
+* Performance: optimize IDs read of ElasticSearch storage options(6 and 7). Use the physical index rather than template
+ alias name.
#### UI
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java
index ebe9dd8..368ea90 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java
@@ -39,4 +39,11 @@ public @interface MetricsExtension {
* @return true if this metrics data could be updated.
*/
boolean supportUpdate();
+
+ /**
+ * @return true means the ID of this metric entity would generate timestamp related ID, such as 20170128-serviceId.
+ * If as false, then, ID would be like serviceId directly. This is typically used for metadata level metric, such as
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic}
+ */
+ boolean timeRelativeID() default false;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
index e516128..fb5ceec 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ManagementStreamProcessor.java
@@ -77,7 +77,8 @@ public class ManagementStreamProcessor implements StreamProcessor<ManagementData
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
- Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), DownSampling.None), false);
+ // Management stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
+ Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.None), false);
final ManagementPersistentWorker persistentWorker = new ManagementPersistentWorker(moduleDefineHolder, model, managementDAO);
workers.put(streamClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index 367d729..6c323f4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -148,19 +148,21 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
*/
boolean supportDownSampling = true;
boolean supportUpdate = true;
+ boolean timeRelativeID = true;
if (metricsExtension != null) {
supportDownSampling = metricsExtension.supportDownSampling();
supportUpdate = metricsExtension.supportUpdate();
+ timeRelativeID = metricsExtension.timeRelativeID();
}
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
+ metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour), false);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
+ metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day), false);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
@@ -169,7 +171,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
Model model = modelSetter.add(
- metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
+ metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
index 00cda59..418190e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamProcessor.java
@@ -77,7 +77,8 @@ public class NoneStreamProcessor implements StreamProcessor<NoneStream> {
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
- Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
+ // None stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
+ Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
final NoneStreamPersistentWorker persistentWorker = new NoneStreamPersistentWorker(moduleDefineHolder, model, noneStream);
workers.put(streamClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
index 23a6531..e1c586d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java
@@ -73,8 +73,9 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
+ // Record stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
Model model = modelSetter.add(
- recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
+ recordClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
workers.put(recordClass, persistentWorker);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
index 5a0c6a4..26a8b37 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java
@@ -67,7 +67,9 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
@Override
@SuppressWarnings("unchecked")
- public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) throws StorageException {
+ public void create(ModuleDefineHolder moduleDefineHolder,
+ Stream stream,
+ Class<? extends TopN> topNClass) throws StorageException {
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
@@ -83,8 +85,9 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
+ // Top N metrics doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
Model model = modelSetter.add(
- topNClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
+ topNClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(
moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
index 04eb718..2cc34e0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IHistoryDeleteDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
+import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -31,8 +32,21 @@ public interface IHistoryDeleteDAO extends DAO {
*
* @param model data entity.
* @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
- * @param ttl the number of days should be kept
+ * @param ttl the number of days should be kept
* @throws IOException when error happens in the deletion process.
*/
void deleteHistory(Model model, String timeBucketColumnName, int ttl) throws IOException;
+
+ /**
+ * Inspection is also driven by the TTL timer. This method is optional to implement, typically, this could be used
+ * to do routing inspection for timer series data, and get the latest status of existing data boundaries(oldest and
+ * latest).
+ *
+ * @param models model list
+ * @param timeBucketColumnName column name represents the time. Right now, always {@link Metrics#TIME_BUCKET}
+ * @throws IOException when error happens in the deletion process.
+ */
+ default void inspect(List<Model> models, String timeBucketColumnName) throws IOException {
+
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
index b2bf7a6..32a267f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java
@@ -25,5 +25,6 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
@RequiredArgsConstructor
public class Storage {
private final String modelName;
+ private final boolean timeRelativeID;
private final DownSampling downsampling;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
index 8c1f575..b373756 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/Model.java
@@ -38,6 +38,7 @@ public class Model {
private final boolean superDataset;
private final boolean isTimeSeries;
private final String aggregationFunctionName;
+ private final boolean timeRelativeID;
public Model(final String name,
final List<ModelColumn> columns,
@@ -46,7 +47,8 @@ public class Model {
final DownSampling downsampling,
final boolean record,
final boolean superDataset,
- final String aggregationFunctionName) {
+ final String aggregationFunctionName,
+ boolean timeRelativeID) {
this.name = name;
this.columns = columns;
this.extraQueryIndices = extraQueryIndices;
@@ -56,5 +58,6 @@ public class Model {
this.record = record;
this.superDataset = superDataset;
this.aggregationFunctionName = aggregationFunctionName;
+ this.timeRelativeID = timeRelativeID;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index f2e1eaa..79152af 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -63,7 +63,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
storage.getModelName(), modelColumns, extraQueryIndices, scopeId,
storage.getDownsampling(), record,
isSuperDatasetModel(aClass),
- FunctionCategory.uniqueFunctionName(aClass)
+ FunctionCategory.uniqueFunctionName(aClass),
+ storage.isTimeRelativeID()
);
this.followColumnNameRules(model);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
index 038d169..53f565a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java
@@ -72,16 +72,25 @@ public enum DataTTLKeeperTimer {
* node list from {@link ClusterNodesQuery}.
*/
private void delete() {
- List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
- if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
- log.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
- return;
- }
-
- log.info("Beginning to remove expired metrics from the storage.");
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.allModels();
- models.forEach(this::execute);
+
+ try {
+ List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
+ if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
+ log.info(
+ "The selected first getAddress is {}. The remove stage is skipped.",
+ remoteInstances.get(0).toString()
+ );
+ return;
+ }
+
+ log.info("Beginning to remove expired metrics from the storage.");
+ models.forEach(this::execute);
+ } finally {
+ log.info("Beginning to inspect data boundaries.");
+ this.inspect(models);
+ }
}
private void execute(Model model) {
@@ -100,4 +109,15 @@ public enum DataTTLKeeperTimer {
log.error(e.getMessage(), e);
}
}
+
+ private void inspect(List<Model> models) {
+ try {
+ moduleManager.find(StorageModule.NAME)
+ .provider()
+ .getService(IHistoryDeleteDAO.class)
+ .inspect(models, Metrics.TIME_BUCKET);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java
index 46838cc..c9db859 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java
@@ -53,7 +53,7 @@ public class StorageModelsTest {
public void testStorageModels() throws StorageException {
StorageModels models = new StorageModels();
models.add(TestModel.class, -1,
- new Storage("StorageModelsTest", DownSampling.Hour),
+ new Storage("StorageModelsTest", false, DownSampling.Hour),
false
);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
similarity index 52%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java
copy to oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
index ebe9dd8..e1fb00c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/MetricsExtension.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/IndicesMetadataCache.java
@@ -16,27 +16,33 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import java.util.HashSet;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
/**
- * MetricsExtension annotation defines extension attributes of the {@link Stream} with {@link MetricsStreamProcessor}.
+ * IndicesMetadataCache hosts all pseudo real time metadata of indices.
*/
-@Target(ElementType.TYPE)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface MetricsExtension {
- /**
- * @return true if this metrics stream support down sampling.
- */
- boolean supportDownSampling();
+@Slf4j
+public class IndicesMetadataCache {
+ public static IndicesMetadataCache INSTANCE = new IndicesMetadataCache();
+
+ private volatile HashSet<String> existingIndices;
+
+ private IndicesMetadataCache() {
+ existingIndices = new HashSet<>();
+ }
+
+ public void update(List<String> indices) {
+ existingIndices = new HashSet<>(indices);
+ }
/**
- * @return true if this metrics data could be updated.
+ * @return true if given index name exists currently.
*/
- boolean supportUpdate();
+ public boolean isExisting(String index) {
+ return existingIndices.contains(index);
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index c233157..af73fc9 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.joda.time.DateTime;
@Slf4j
@@ -73,4 +74,37 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
client.createIndex(latestIndex);
}
}
+
+ @Override
+ public void inspect(List<Model> models, String timeBucketColumnName) {
+ List<String> indices = new ArrayList<>();
+ models.forEach(model -> {
+ if (!model.isTimeSeries()) {
+ return;
+ }
+
+ ElasticSearchClient client = getClient();
+
+ if (!model.isRecord()) {
+ if (!DownSampling.Minute.equals(model.getDownsampling())) {
+ /*
+ * As all metrics data in different down sampling rule of one day are in the same index, the inspection
+ * operation is only required to run once.
+ */
+ return;
+ }
+ }
+ String tableName = IndexController.INSTANCE.getTableName(model);
+ List<String> indexes;
+ try {
+ indexes = client.retrievalIndexByAliases(tableName);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ return;
+ }
+
+ indices.addAll(indexes);
+ });
+ IndicesMetadataCache.INSTANCE.update(indices);
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
index 7a72838..4fb5090 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/MetricsEsDAO.java
@@ -20,7 +20,10 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
@@ -28,9 +31,13 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.IndicesMetadataCache;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import static java.util.stream.Collectors.groupingBy;
+
+@Slf4j
public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
protected final StorageHashMapBuilder<Metrics> storageBuilder;
@@ -42,16 +49,54 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
- String tableName = IndexController.INSTANCE.getTableName(model);
- String[] ids = metrics.stream()
- .map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
- .toArray(String[]::new);
- SearchResponse response = getClient().ids(tableName, ids);
- List<Metrics> result = new ArrayList<>(response.getHits().getHits().length);
- for (int i = 0; i < response.getHits().getHits().length; i++) {
- Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap());
- result.add(source);
- }
+ Map<String, List<Metrics>> groupIndices
+ = metrics.stream()
+ .collect(
+ groupingBy(metric -> {
+ if (model.isTimeRelativeID()) {
+ // Try to use with timestamp index name(write index),
+ // if latest cache shows this name doesn't exist,
+ // then fail back to template alias name.
+ // This should only happen in very rare case, such as this is the time to create new index
+ // as a new day comes, and the index cache is pseudo real time.
+ // This case doesn't affect the result, just has lower performance due to using the alias name.
+ // Another case is that a removed index showing existing also due to latency,
+ // which could cause multiGet fails
+ // but this should not happen in the real runtime, TTL timer only removed the oldest indices,
+ // which should not have an update/insert.
+ String indexName = TimeSeriesUtils.writeIndexName(model, metric.getTimeBucket());
+ // Format the name to follow the global physical index naming policy.
+ if (!IndicesMetadataCache.INSTANCE.isExisting(
+ getClient().formatIndexName(indexName))) {
+ indexName = IndexController.INSTANCE.getTableName(model);
+ }
+ return indexName;
+ } else {
+ // Metadata level metrics, always use alias name, due to the physical index of the records
+ // can't be located through timestamp.
+ return IndexController.INSTANCE.getTableName(model);
+ }
+ })
+ );
+
+ // The groupIndices mostly include one or two group,
+ // the current day and the T-1 day(if at the edge between days)
+ List<Metrics> result = new ArrayList<>(metrics.size());
+ groupIndices.forEach((tableName, metricList) -> {
+ String[] ids = metrics.stream()
+ .map(item -> IndexController.INSTANCE.generateDocId(model, item.id()))
+ .toArray(String[]::new);
+ try {
+ SearchResponse response = getClient().ids(tableName, ids);
+ for (int i = 0; i < response.getHits().getHits().length; i++) {
+ Metrics source = storageBuilder.storage2Entity(response.getHits().getAt(i).getSourceAsMap());
+ result.add(source);
+ }
+ } catch (IOException e) {
+ log.error("multiGet id=" + Arrays.toString(ids) + " from " + tableName + " fails.", e);
+ }
+ });
+
return result;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index 27ba497..d1a4fa5 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -37,13 +37,13 @@ public class TimeSeriesUtilsTest {
@Before
public void prepare() {
superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(), Lists.newArrayList(),
- 0, DownSampling.Minute, true, true, ""
+ 0, DownSampling.Minute, true, true, "", true
);
normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(), Lists.newArrayList(),
- 0, DownSampling.Minute, true, false, ""
+ 0, DownSampling.Minute, true, false, "", true
);
normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(), Lists.newArrayList(),
- 0, DownSampling.Minute, false, false, ""
+ 0, DownSampling.Minute, false, false, "", true
);
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
TimeSeriesUtils.setDAY_STEP(3);