You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2023/01/17 07:56:38 UTC

[GitHub] [skywalking] chenhaipeng opened a new pull request, #10287: Feat/es batch byte size

chenhaipeng opened a new pull request, #10287:
URL: https://github.com/apache/skywalking/pull/10287

   
   ### Elasticsearch storage: Support batchOfBytes configuration to set up the batch bytes size of the record data. 
    Execute the async bulk record data every 5000 requests, sometime  the request body is too large, causing the es server to reject it, i think the best way is provide byte-sized configuration.
   - [x] If this is non-trivial feature, paste the links/URLs to the design doc.
   - [x] Update the documentation to include this new feature.
   - [ ] Tests(including UT, IT, E2E) are added to verify the new feature.
   - [ ] If it's UI related, attach the screenshots below.
   - [x ] If this pull request closes/resolves/fixes an existing issue, replace the issue number. Closes #<issue number>.
   - [x ] Update the [`CHANGES` log](https://github.com/apache/skywalking/blob/master/docs/en/changes/changes.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071916130


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   i think so , where is it better to control sizeOfBytes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1073248987


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   >  5m is about 3000 per dimension
   
   What does this mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1410225382

   @chenhaipeng Have you been back on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072210868


##########
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java:
##########
@@ -88,6 +88,8 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
      * @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy.
      */
     private int bulkActions = 5000;
+
+    private int batchOfBytes = 1024 * 1024 * 5;

Review Comment:
   ```suggestion
       /**
        * @since 9.4.0 A threshold to control the max body size of ElasticSearch Bulk flush. 
        */
       private int batchOfBytes = 1024 * 1024 * 5;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1385562741

   One unresolved question, https://github.com/apache/skywalking/pull/10287#discussion_r1072219692
   
   Pending on @kezhenxu94 review, this part is maintained by him.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072208151


##########
docs/en/changes/changes.md:
##########
@@ -78,6 +78,8 @@
 * Fix TCP service instances are lack of instance properties like `pod` and `namespace`, which causes Pod log not to work for TCP workloads.
 * Add Python HBase happybase module component ID(94).
 * Fix gRPC alarm cannot update settings from dynamic configuration source.
+* Elasticsearch storage: Support batchOfBytes configuration to set up the batch bytes size of the record data.

Review Comment:
   ```suggestion
   * Add `batchOfBytes` configuration to limit the size of bulk flush.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng merged pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng merged PR #10287:
URL: https://github.com/apache/skywalking/pull/10287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071934809


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   Then, changing `BulkProcessor#doFlush` should be good enough and easy enough. Could you try that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072187489


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > yes , this mechanism is to resolve too large an HTTP body
   
   As you set the default value to 5M, could you share what is the size limitation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1385404627

   > [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.
   
   @chenhaipeng I updated the PR title, AFAIK, both record(trace/log) and metrics are using this. Are you aware of this? Your PR title was about `record` only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071875685


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   @kezhenxu94 I have a little concern about the payload of this, which pushed a request to byte[] but only for length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1073256194


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   OK, 3k-10k docs should not be an issue. After all, the bulk mixed metrics, traces and logs. Let's keep the default size of 10m. For today's network performance, a 10m flush should not be a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "chenhaipeng (via GitHub)" <gi...@apache.org>.
chenhaipeng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1418413760

   > @chenhaipeng my review comments are not addressed yet :)
   
   ok , please code review again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1096897918


##########
docs/en/setup/backend/configuration-vocabulary.md:
##########
@@ -106,6 +106,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
 | -                       | -             | superDatasetIndexReplicasNumber                                                                                                                                          | Represents the replicas number in the super size dataset record index.                                                                                                                                                                                                                                                                                                                                                                                          | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0                                                                                            |
 | -                       | -             | indexTemplateOrder                                                                                                                                                       | The order of index template.                                                                                                                                                                                                                                                                                                                                                                                                                                    | SW_STORAGE_ES_INDEX_TEMPLATE_ORDER                | 0                                                                                            |
 | -                       | -             | bulkActions                                                                                                                                                              | Async bulk size of the record data batch execution.                                                                                                                                                                                                                                                                                                                                                                                                             | SW_STORAGE_ES_BULK_ACTIONS                        | 5000                                                                                         |
+| -                       | -              | batchOfBytes                                                                                                                                                             | A threshold to control the max body size of ElasticSearch Bulk flush.                                                                                                                                                                                                                                                                                                                           | SW_STORAGE_ES_BATCH_OF_BYTES                      | 5242880                                                                                      |

Review Comment:
   ```suggestion
   | -                       | -              | batchOfBytes                                                                                                                                                             | A threshold to control the max body size of ElasticSearch Bulk flush.                                                                                                                                                                                                                                                                                                                           | SW_STORAGE_ES_BATCH_OF_BYTES                      | 10485760  (10m)                                                                                    |
   ```
   Do you miss this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "wu-sheng (via GitHub)" <gi...@apache.org>.
wu-sheng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1411715220

   > > @chenhaipeng Have you been back on this?
   > 
   > yes,i change the default value to 10M
   
   How about Zhenxu review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071872344


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -177,9 +185,20 @@ private CompletableFuture<Void> doFlush(final List<Holder> batch) {
         return future;
     }
 
+
+    private byte[] toByteArray (Object obj) throws IOException {

Review Comment:
   This seems a code format issue..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072209563


##########
docs/en/setup/backend/configuration-vocabulary.md:
##########
@@ -106,6 +106,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
 | -                       | -             | superDatasetIndexReplicasNumber                                                                                                                                          | Represents the replicas number in the super size dataset record index.                                                                                                                                                                                                                                                                                                                                                                                          | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0                                                                                            |
 | -                       | -             | indexTemplateOrder                                                                                                                                                       | The order of index template.                                                                                                                                                                                                                                                                                                                                                                                                                                    | SW_STORAGE_ES_INDEX_TEMPLATE_ORDER                | 0                                                                                            |
 | -                       | -             | bulkActions                                                                                                                                                              | Async bulk size of the record data batch execution.                                                                                                                                                                                                                                                                                                                                                                                                             | SW_STORAGE_ES_BULK_ACTIONS                        | 5000                                                                                         |
+| -                       | -              | batchOfBytes                                                                                                                                                             | Async bulk byte size of the record data batch execution,If the batch data is too large, it will be split into the specified size.default is 5M                                                                                                                                                                                                                                                                                                                            | SW_STORAGE_ES_BATCH_OF_BYTES                      | 5242880                                                                                      |

Review Comment:
   ```suggestion
   | -                       | -              | batchOfBytes                                                                                                                                                             | A threshold to control the max body size of ElasticSearch Bulk flush.                                                                                                                                                                                                                                                                                                                           | SW_STORAGE_ES_BATCH_OF_BYTES                      | 5242880                                                                                      |
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071891893


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   In another word, we should make `sizeOfBytes` a protection mechanism rather than a flush mechanism. Add this into `BulkProcessor#doFlush` to control the final size of the HTTP request.
   
   @chenhaipeng What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071876796


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -177,9 +185,20 @@ private CompletableFuture<Void> doFlush(final List<Holder> batch) {
         return future;
     }
 
+
+    private byte[] toByteArray (Object obj) throws IOException {

Review Comment:
   ok i fix it
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072219692


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   So, you are facing a 100m limitation? I am feeling 5m is very small, and if there are many tags/logs in the segment/log, we may flush a few data into the storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072224961


##########
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java:
##########
@@ -88,6 +88,8 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
      * @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy.
      */
     private int bulkActions = 5000;
+
+    private int batchOfBytes = 1024 * 1024 * 5;

Review Comment:
   > Please update the `application.yml` comments accordingly.
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1385385690

   `configuration-vocabulary.md` is missed to update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] kezhenxu94 commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1073065851


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,76 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        futures.stream().map(future -> future.join());
+        semaphore.release();
         lastFlushTS = System.currentTimeMillis();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
+    private List<CompletableFuture<Void>> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
-
         if (batch.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return Collections.emptyList();
         }
-
-        final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
-            try {
-                final RequestFactory rf = v.requestFactory();
-                final List<byte[]> bs = new ArrayList<>();
-                for (final Holder holder : batch) {
-                    bs.add(v.codec().encode(holder.request));
-                    bs.add("\n".getBytes());
+        try {
+            int bufferOfBytes = 0;
+            Codec codec = es.get().version().get().codec();
+            final List<byte[]> bs = new ArrayList<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            List<ByteBuf> byteBufList = new ArrayList<>();
+            for (final Holder holder : batch) {
+                byte[] bytes = codec.encode(holder.request);
+                bs.add(bytes);
+                bs.add("\n".getBytes());
+                bufferOfBytes += bytes.length;
+                if (bufferOfBytes > batchOfBytes) {
+                    final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
+                    byteBufList.add(content);
+                    bs.clear();

Review Comment:
   ```suggestion
                       bs.clear();
                       bufferOfBytes = 0;
   ```



##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,76 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        futures.stream().map(future -> future.join());
+        semaphore.release();
         lastFlushTS = System.currentTimeMillis();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
+    private List<CompletableFuture<Void>> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
-
         if (batch.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return Collections.emptyList();
         }
-
-        final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
-            try {
-                final RequestFactory rf = v.requestFactory();
-                final List<byte[]> bs = new ArrayList<>();
-                for (final Holder holder : batch) {
-                    bs.add(v.codec().encode(holder.request));
-                    bs.add("\n".getBytes());
+        try {
+            int bufferOfBytes = 0;
+            Codec codec = es.get().version().get().codec();
+            final List<byte[]> bs = new ArrayList<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            List<ByteBuf> byteBufList = new ArrayList<>();
+            for (final Holder holder : batch) {
+                byte[] bytes = codec.encode(holder.request);
+                bs.add(bytes);
+                bs.add("\n".getBytes());
+                bufferOfBytes += bytes.length;
+                if (bufferOfBytes > batchOfBytes) {
+                    final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
+                    byteBufList.add(content);
+                    bs.clear();
                 }
+            }
+            if (CollectionUtils.isNotEmpty(bs)) {
                 final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
-                return es.get().client().execute(rf.bulk().bulk(content))
-                         .aggregate().thenAccept(response -> {
-                        final HttpStatus status = response.status();
-                        if (status != HttpStatus.OK) {
-                            throw new RuntimeException(response.contentUtf8());
-                        }
-                    });
-            } catch (Exception e) {
-                return Exceptions.throwUnsafely(e);
+                byteBufList.add(content);
             }
-        });
-        future.whenComplete((ignored, exception) -> {
-            if (exception != null) {
-                batch.stream().map(it -> it.future)
-                     .forEach(it -> it.completeExceptionally(exception));
-                log.error("Failed to execute requests in bulk", exception);
-            } else {
-                log.debug("Succeeded to execute {} requests in bulk", batch.size());
-                batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
+            for (final ByteBuf content : byteBufList) {
+                CompletableFuture future = es.get().version().thenCompose(v -> {

Review Comment:
   ```suggestion
                   CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
   ```



##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,76 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        futures.stream().map(future -> future.join());
+        semaphore.release();
         lastFlushTS = System.currentTimeMillis();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
+    private List<CompletableFuture<Void>> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
-
         if (batch.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return Collections.emptyList();
         }
-
-        final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
-            try {
-                final RequestFactory rf = v.requestFactory();
-                final List<byte[]> bs = new ArrayList<>();
-                for (final Holder holder : batch) {
-                    bs.add(v.codec().encode(holder.request));
-                    bs.add("\n".getBytes());
+        try {
+            int bufferOfBytes = 0;
+            Codec codec = es.get().version().get().codec();
+            final List<byte[]> bs = new ArrayList<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            List<ByteBuf> byteBufList = new ArrayList<>();
+            for (final Holder holder : batch) {
+                byte[] bytes = codec.encode(holder.request);
+                bs.add(bytes);
+                bs.add("\n".getBytes());
+                bufferOfBytes += bytes.length;
+                if (bufferOfBytes > batchOfBytes) {

Review Comment:
   ```suggestion
                   if (bufferOfBytes >= batchOfBytes) {
   ```



##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,76 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        futures.stream().map(future -> future.join());
+        semaphore.release();
         lastFlushTS = System.currentTimeMillis();
     }
 
-    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
+    private List<CompletableFuture<Void>> doFlush(final List<Holder> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
-
         if (batch.isEmpty()) {
-            return CompletableFuture.completedFuture(null);
+            return Collections.emptyList();
         }
-
-        final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
-            try {
-                final RequestFactory rf = v.requestFactory();
-                final List<byte[]> bs = new ArrayList<>();
-                for (final Holder holder : batch) {
-                    bs.add(v.codec().encode(holder.request));
-                    bs.add("\n".getBytes());
+        try {
+            int bufferOfBytes = 0;
+            Codec codec = es.get().version().get().codec();
+            final List<byte[]> bs = new ArrayList<>();
+            List<CompletableFuture<Void>> futures = new ArrayList<>();
+            List<ByteBuf> byteBufList = new ArrayList<>();
+            for (final Holder holder : batch) {
+                byte[] bytes = codec.encode(holder.request);
+                bs.add(bytes);
+                bs.add("\n".getBytes());
+                bufferOfBytes += bytes.length;

Review Comment:
   You need to also add the `\n`
   
   ```suggestion
                   bufferOfBytes += bytes.length + 1;
   ```



##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,76 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        futures.stream().map(future -> future.join());
+        semaphore.release();

Review Comment:
   ```suggestion
   
           final List<CompletableFuture<Void>> futures = doFlush(batch);
           final CompletableFuture<Void> future = CompletableFuture
               .allOf(futures.toArray(new CompletableFuture[futures.size()]));
           future.whenComplete((v, t) -> semaphore.release());
           future.join();
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072167811


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > I use elasticsearch in the production environment,I solved it simply as above,but this problem may also occur with other storage type
   
   i fix it ,please check again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071937174


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   I use elasticsearch in the production environment,I solved it simply as above,but this problem may also occur with other storage type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071932462


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   yes , this mechanism is to resolve too large an HTTP body



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071941923


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > Then, changing `BulkProcessor#doFlush` should be good enough and easy enough. Could you try that?
   
   ok ,i try it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1073245030


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   In a production environment, 5m is about 3000 per dimension. I think in extreme cases, with too many tags, it might cause oversized. It is reasonable to set 10m, https://cloud.tencent.com/document/product/845/56274



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1073252631


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   I set the request body to 5mB, about 3000 docs on average



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1385415425

   > > [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.
   > 
   > @chenhaipeng I updated the PR title, AFAIK, both record(trace/log) and metrics are using this. Are you aware of this? Your PR title was about `record` only.
   
   yes ,both record(trace/log) and metrics are using this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071886780


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   I am feeling we want this feature, we should somehow work with ElasticSearch client, rather than run serialization twice for a bulk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071929341


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   I think `BulkProcessor#doFlush` includes all logic you need. You should have everything you need for. I don't think you need to unpack from there. You only have to do is adding the mechanism to build one request(current) or more.
   
   From my understanding, this mechanism is to resolve too large an HTTP body issue, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "chenhaipeng (via GitHub)" <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1096904387


##########
docs/en/setup/backend/configuration-vocabulary.md:
##########
@@ -106,6 +106,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
 | -                       | -             | superDatasetIndexReplicasNumber                                                                                                                                          | Represents the replicas number in the super size dataset record index.                                                                                                                                                                                                                                                                                                                                                                                          | SW_STORAGE_ES_SUPER_DATASET_INDEX_REPLICAS_NUMBER | 0                                                                                            |
 | -                       | -             | indexTemplateOrder                                                                                                                                                       | The order of index template.                                                                                                                                                                                                                                                                                                                                                                                                                                    | SW_STORAGE_ES_INDEX_TEMPLATE_ORDER                | 0                                                                                            |
 | -                       | -             | bulkActions                                                                                                                                                              | Async bulk size of the record data batch execution.                                                                                                                                                                                                                                                                                                                                                                                                             | SW_STORAGE_ES_BULK_ACTIONS                        | 5000                                                                                         |
+| -                       | -              | batchOfBytes                                                                                                                                                             | A threshold to control the max body size of ElasticSearch Bulk flush.                                                                                                                                                                                                                                                                                                                           | SW_STORAGE_ES_BATCH_OF_BYTES                      | 5242880                                                                                      |

Review Comment:
   ok ,i check again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072174023


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -129,57 +135,79 @@ public void flush() {
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-
-        final CompletableFuture<Void> flush = doFlush(batch);
-        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
-        flush.join();
-
+        final List<CompletableFuture<Void>> futures = doFlush(batch);
+        for (final CompletableFuture<Void> future : futures) {
+            future.join();
+        }
+        futures.stream().map(future -> future.join());

Review Comment:
   Are these duplicated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072211637


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > > yes , this mechanism is to resolve too large an HTTP body
   > 
   > As you set the default value to 5M, could you share what is the size limitation?
   
   there is elastic http limit ,Defaults to 100mb, I think it should not exceed 10m. Kafka has a similar limit of 10m.
   https://www.elastic.co/guide/en/elasticsearch/reference/7.17/modules-network.html#http-settings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1072211171


##########
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java:
##########
@@ -88,6 +88,8 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
      * @since 8.7.0 This setting affects all traces/logs/metrics/metadata flush policy.
      */
     private int bulkActions = 5000;
+
+    private int batchOfBytes = 1024 * 1024 * 5;

Review Comment:
   Please update the `application.yml` comments accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
chenhaipeng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071922400


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > i think so , where is it better to control sizeOfBytes?
   
   If a protection mechanism is added when sending, then an unpacking may be required, maybe which increase the complexity 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] wu-sheng commented on a diff in pull request #10287: Elasticsearch storage :Support batchOfBytes configuration to set up the batch bytes size of the record data.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on code in PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#discussion_r1071945211


##########
oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java:
##########
@@ -94,14 +100,16 @@ public CompletableFuture<Void> add(UpdateRequest request) {
     private CompletableFuture<Void> internalAdd(Object request) {
         requireNonNull(request, "request");
         final CompletableFuture<Void> f = new CompletableFuture<>();
+        int len = toByteArray(request).length;

Review Comment:
   > I use elasticsearch in the production environment,I solved it simply as above,but this problem may also occur with other storage type
   
   That is fine. We don't need to resolve an issue that doesn't happen(at least in some known scenarios). Let's focus on fixing this on ElasticSearch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] chenhaipeng commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "chenhaipeng (via GitHub)" <gi...@apache.org>.
chenhaipeng commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1411592040

   > @chenhaipeng Have you been back on this?
   
   yes,i change the default value to 10M


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [skywalking] kezhenxu94 commented on pull request #10287: [Elasticsearch] Support batchOfBytes configuration to limit size of bulk flush.

Posted by "kezhenxu94 (via GitHub)" <gi...@apache.org>.
kezhenxu94 commented on PR #10287:
URL: https://github.com/apache/skywalking/pull/10287#issuecomment-1411720729

   @chenhaipeng my review comments are not addressed yet :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org