You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/08/10 06:50:26 UTC

[skywalking] branch master updated: Super Size Dataset record index es rolling step (#5282)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 2df3c68  Super Size Dataset record index  es rolling step (#5282)
2df3c68 is described below

commit 2df3c683baede7a0b7f24885ce1bbe48b13c7dac
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Mon Aug 10 14:49:11 2020 +0800

    Super Size Dataset record index  es rolling step (#5282)
---
 docs/en/setup/backend/backend-storage.md           |  4 ++
 .../src/main/resources/application.yml             |  2 +
 .../StorageModuleElasticsearchConfig.java          |  5 +++
 .../StorageModuleElasticsearchProvider.java        | 49 ++++++++++++----------
 .../plugin/elasticsearch/base/TimeSeriesUtils.java | 11 ++++-
 .../elasticsearch/base/TimeSeriesUtilsTest.java    | 40 ++++++++++++++++++
 .../StorageModuleElasticsearch7Provider.java       |  8 ++++
 7 files changed, 96 insertions(+), 23 deletions(-)

diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index e6dcd47..bd48a56 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -54,6 +54,7 @@ storage:
     trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
     trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value lt 0
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
@@ -105,6 +106,9 @@ Such as, if dayStep == 11,
 1. data in [2000-01-01, 2000-01-11] will be merged into the index-20000101.
 1. data in [2000-01-12, 2000-01-22] will be merged into the index-20000112.
 
+`storage/elasticsearch/superDatasetDayStep` override the `storage/elasticsearch/dayStep` if the value is positive.
+This would affect the record related entities, such as the trace segment. In some cases, the size of metrics is much less than the record(trace), this would help the shards balance in the ElasticSearch cluster.
+ 
 NOTICE, TTL deletion would be affected by these. You should set an extra more dayStep in your TTL. Such as you want to TTL == 30 days and dayStep == 10, you actually need to set TTL = 40;
 
 ### Secrets Management File Of ElasticSearch Authentication
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 1bda6a9..b6c585a 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -101,6 +101,7 @@ storage:
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
     indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:1} # Shard number of new indexes
     superDatasetIndexShardsFactor: ${SW_STORAGE_ES_SUPER_DATASET_INDEX_SHARDS_FACTOR:5} # Super data set has been defined in the codes, such as trace segments. This factor provides more shards for the super data set, shards number = indexShardsNumber * superDatasetIndexShardsFactor. Also, this factor effects Zipkin and Jaeger traces.
     indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
@@ -119,6 +120,7 @@ storage:
     trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
     trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
     dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
+    superDatasetDayStep: ${SW_SUPERDATASET_STORAGE_DAY_STEP:-1} # Represent the number of days in the super size dataset record index, the default value is the same as dayStep when the value is less than 0
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     secretsManagementFile: ${SW_ES_SECRETS_MANAGEMENT_FILE:""} # Secrets management file in the properties format includes the username, password, which are managed by 3rd party tool.
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 6bcb653..4fa466d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -80,6 +80,11 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
      */
     @Getter
     private int dayStep = 1;
+    /**
+     * @since 8.2.0, the record day step is for super size dataset record index rolling when the value of it is greater than 0
+     */
+    @Getter
+    private int superDatasetDayStep = -1;
     @Setter
     private int resultWindowMaxSize = 10000;
     @Setter
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 75f17f7..030b965 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -114,11 +114,15 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         }
         if (config.getDayStep() > 1) {
             TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+        }
+        if (config.getSuperDatasetDayStep() > 0) {
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
         }
 
         if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
             MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
-                    10, readableContents -> {
+                10, readableContents -> {
                 final byte[] secretsFileContent = readableContents.get(0);
                 if (secretsFileContent == null) {
                     return;
@@ -146,47 +150,50 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         }
 
         elasticSearchClient = new ElasticSearchClient(
-                config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
-                .getTrustStorePass(), config.getUser(), config.getPassword(),
-                indexNameConverters(config.getNameSpace())
+            config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
+            .getTrustStorePass(), config.getUser(), config.getPassword(),
+            indexNameConverters(config.getNameSpace())
         );
 
         this.registerServiceImplementation(
-                IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
-                        .getFlushInterval(), config.getConcurrentRequests()));
+            IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
+                .getFlushInterval(), config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
+            IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(elasticSearchClient, config
-                        .getResultWindowMaxSize()));
+            INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(elasticSearchClient, config
+                .getResultWindowMaxSize()));
         this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
+            ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
         this.registerServiceImplementation(
-                IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
+            IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
         this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
         this.registerServiceImplementation(
-                IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
-                        .getProfileTaskQueryMaxSize()));
+            IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
+                .getProfileTaskQueryMaxSize()));
         this.registerServiceImplementation(
-                UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearchClient));
+            UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearchClient));
     }
 
     @Override
     public void start() throws ModuleStartException {
-        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
-        HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+                                                   .provider()
+                                                   .getService(MetricsCreator.class);
+        HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
+            "storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
         elasticSearchClient.registerChecker(healthChecker);
         try {
             elasticSearchClient.connect();
@@ -204,7 +211,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
 
     @Override
     public String[] requiredModules() {
-        return new String[]{CoreModule.NAME};
+        return new String[] {CoreModule.NAME};
     }
 
     public static List<IndexNameConverter> indexNameConverters(String namespace) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
index 55c2fc6..6764def 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtils.java
@@ -40,13 +40,18 @@ public class TimeSeriesUtils {
     private static final DateTime DAY_ONE = TIME_BUCKET_FORMATTER.parseDateTime("20000101");
     @Setter
     private static int DAY_STEP = 1;
+    @Setter
+    private static int SUPER_DATASET_DAY_STEP = 1;
 
     /**
      * @return formatted latest index name, based on current timestamp.
      */
     public static String latestWriteIndexName(Model model) {
         long timeBucket;
-        if (model.isRecord()) {
+        if (model.isRecord() && model.isSuperDataset()) {
+            timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
+            return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
+        } else if (model.isRecord()) {
             timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
             return model.getName() + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
         } else {
@@ -61,7 +66,9 @@ public class TimeSeriesUtils {
     static String writeIndexName(Model model, long timeBucket) {
         final String modelName = model.getName();
 
-        if (model.isRecord()) {
+        if (model.isRecord() && model.isSuperDataset()) {
+            return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, SUPER_DATASET_DAY_STEP);
+        } else if (model.isRecord()) {
             return modelName + Const.LINE + compressTimeBucket(timeBucket / 1000000, DAY_STEP);
         } else {
             switch (model.getDownsampling()) {
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
index f46eb13..20442d7 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/TimeSeriesUtilsTest.java
@@ -18,12 +18,37 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
 
 import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.compressTimeBucket;
+import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils.writeIndexName;
 
 public class TimeSeriesUtilsTest {
+
+    private Model superDatasetModel;
+    private Model normalRecordModel;
+    private Model normalMetricsModel;
+
+    @Before
+    public void prepare() {
+        superDatasetModel = new Model("superDatasetModel", Lists.newArrayList(), Lists.newArrayList(),
+                                      0, DownSampling.Minute, true, true
+        );
+        normalRecordModel = new Model("normalRecordModel", Lists.newArrayList(), Lists.newArrayList(),
+                                      0, DownSampling.Minute, true, false
+        );
+        normalMetricsModel = new Model("normalMetricsModel", Lists.newArrayList(), Lists.newArrayList(),
+                                       0, DownSampling.Minute, false, false
+        );
+        TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(1);
+        TimeSeriesUtils.setDAY_STEP(3);
+    }
+
     @Test
     public void testCompressTimeBucket() {
         Assert.assertEquals(20000101L, compressTimeBucket(20000105, 11));
@@ -33,4 +58,19 @@ public class TimeSeriesUtilsTest {
         Assert.assertEquals(20000123L, compressTimeBucket(20000123, 11));
         Assert.assertEquals(20000123L, compressTimeBucket(20000125, 11));
     }
+
+    @Test
+    public void testIndexRolling() {
+        long secondTimeBucket = 2020_0809_1010_59L;
+        long minuteTimeBucket = 2020_0809_1010L;
+        Assert.assertEquals("superDatasetModel-20200809", writeIndexName(superDatasetModel, secondTimeBucket));
+        Assert.assertEquals("normalRecordModel-20200807", writeIndexName(normalRecordModel, secondTimeBucket));
+        Assert.assertEquals("normalMetricsModel-20200807", writeIndexName(normalMetricsModel, minuteTimeBucket));
+        secondTimeBucket += 1000000;
+        minuteTimeBucket += 10000;
+        Assert.assertEquals("superDatasetModel-20200810", writeIndexName(superDatasetModel, secondTimeBucket));
+        Assert.assertEquals("normalRecordModel-20200810", writeIndexName(normalRecordModel, secondTimeBucket));
+        Assert.assertEquals("normalMetricsModel-20200810", writeIndexName(normalMetricsModel, minuteTimeBucket));
+    }
+
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index 98da53c..414946f 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -54,6 +54,7 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
 import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
@@ -110,6 +111,13 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
         if (!StringUtil.isEmpty(config.getNameSpace())) {
             config.setNameSpace(config.getNameSpace().toLowerCase());
         }
+        if (config.getDayStep() > 1) {
+            TimeSeriesUtils.setDAY_STEP(config.getDayStep());
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
+        }
+        if (config.getSuperDatasetDayStep() > 0) {
+            TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
+        }
         if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
             MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
                     10, readableContents -> {