You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/28 17:08:52 UTC

[GitHub] [pinot] KKcorps opened a new pull request, #8609: Add support for Retry in Kinesis Stream producer

KKcorps opened a new pull request, #8609:
URL: https://github.com/apache/pinot/pull/8609

   Kinesis producer can fail either due throughput limit exceeded or stream becoming INACTIVE due to variety of factors such as resharding.
   
   This allows user to add two properties to enable retry for producer. Currently only `FixedDelayRetryPolicy` is used.
   
   `num_retries` - Allows to specify the max number of attempts to make
   
   `retry_delay_millis` - delay in milliseconds between retry attempts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8609:
URL: https://github.com/apache/pinot/pull/8609#issuecomment-1112511378

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8609](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e282303) into [master](https://codecov.io/gh/apache/pinot/commit/232b946419d05b785610e9b2daf7467f5f8bee82?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (232b946) will **decrease** coverage by `1.64%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head e282303 differs from pull request most recent head da87aa0. Consider uploading reports for the commit da87aa0 to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8609      +/-   ##
   ============================================
   - Coverage     70.68%   69.04%   -1.65%     
   - Complexity     4321     4322       +1     
   ============================================
     Files          1693     1693              
     Lines         88795    88826      +31     
     Branches      13472    13474       +2     
   ============================================
   - Hits          62767    61327    -1440     
   - Misses        21639    23189    +1550     
   + Partials       4389     4310      -79     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `25.74% <0.00%> (-0.05%)` | :arrow_down: |
   | unittests1 | `66.95% <ø> (ø)` | |
   | unittests2 | `14.18% <0.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...gin/stream/kinesis/server/KinesisDataProducer.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9zZXJ2ZXIvS2luZXNpc0RhdGFQcm9kdWNlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/minion/exception/TaskCancelledException.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtbWluaW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9taW5pb24vZXhjZXB0aW9uL1Rhc2tDYW5jZWxsZWRFeGNlcHRpb24uamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...nverttorawindex/ConvertToRawIndexTaskExecutor.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydHRvcmF3aW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrRXhlY3V0b3IuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <0.00%> (-94.74%)` | :arrow_down: |
   | [...plugin/segmentuploader/SegmentUploaderDefault.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zZWdtZW50LXVwbG9hZGVyL3Bpbm90LXNlZ21lbnQtdXBsb2FkZXItZGVmYXVsdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3NlZ21lbnR1cGxvYWRlci9TZWdtZW50VXBsb2FkZXJEZWZhdWx0LmphdmE=) | `0.00% <0.00%> (-87.10%)` | :arrow_down: |
   | [.../transform/function/MapValueTransformFunction.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci90cmFuc2Zvcm0vZnVuY3Rpb24vTWFwVmFsdWVUcmFuc2Zvcm1GdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (-85.30%)` | :arrow_down: |
   | [...ot/common/messages/RoutingTableRebuildMessage.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvUm91dGluZ1RhYmxlUmVidWlsZE1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-81.82%)` | :arrow_down: |
   | [...verttorawindex/ConvertToRawIndexTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3MvY29udmVydHRvcmF3aW5kZXgvQ29udmVydFRvUmF3SW5kZXhUYXNrR2VuZXJhdG9yLmphdmE=) | `5.45% <0.00%> (-80.00%)` | :arrow_down: |
   | [...ache/pinot/common/lineage/SegmentLineageUtils.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbGluZWFnZS9TZWdtZW50TGluZWFnZVV0aWxzLmphdmE=) | `22.22% <0.00%> (-77.78%)` | :arrow_down: |
   | [...ore/startree/executor/StarTreeGroupByExecutor.java](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zdGFydHJlZS9leGVjdXRvci9TdGFyVHJlZUdyb3VwQnlFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (-77.78%)` | :arrow_down: |
   | ... and [127 more](https://codecov.io/gh/apache/pinot/pull/8609/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [232b946...da87aa0](https://codecov.io/gh/apache/pinot/pull/8609?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861193313


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
-            .build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));

Review Comment:
   ok got it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861185288


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
-            .build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));

Review Comment:
   Yep, the retryPolicy.attempt throws the exception. It is available in `pinot-spi` `org.apache.pinot.spi.utils.retry.RetryPolicy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861185288


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
-            .build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));

Review Comment:
   Yep, the retryPolicy.attempt throws the exception. It is already used in Pinot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861191759


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps commented on PR #8609:
URL: https://github.com/apache/pinot/pull/8609#issuecomment-1112530756

   Not an external user but e.g. we use this producer in our GithubEventsProducer as well as in test cases and we may want to tweak these value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861180460


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);
+    }
   }
 
   @Override
   public void produce(String topic, byte[] key, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new String(key))
-            .build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, key, payload));

Review Comment:
   how is exception thrown in the lambda handled? looks like putRecord already has a try-catch within the method. When it returns false, does this `_retryPolicy.attempt` throw an exception? 



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);

Review Comment:
   the log line seems misleading. we don't know why the attempts to push record into stream failed. is retry exhaustion the only reason to reach this point? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #8609:
URL: https://github.com/apache/pinot/pull/8609#discussion_r861193147


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java:
##########
@@ -54,42 +71,56 @@ public void init(Properties props) {
             .credentialsProvider(getLocalAWSCredentials(props))
             .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       } else {
-        kinesisClientBuilder =
-            KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
-                .credentialsProvider(DefaultCredentialsProvider.create())
-                .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
+        kinesisClientBuilder = KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+            .credentialsProvider(DefaultCredentialsProvider.create())
+            .httpClientBuilder(new ApacheSdkHttpService().createHttpClientBuilder());
       }
 
       if (props.containsKey(ENDPOINT)) {
         String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
         try {
           kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new URI(kinesisEndpoint));
         } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: "
-              + kinesisEndpoint, e);
+          throw new IllegalArgumentException("URI syntax is not correctly specified for endpoint: " + kinesisEndpoint,
+              e);
         }
       }
 
       _kinesisClient = kinesisClientBuilder.build();
+
+      int numRetries = Integer.parseInt(props.getProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES));
+      long retryDelayMs = Long.parseLong(props.getProperty(RETRY_DELAY_MILLIS, DEFAULT_RETRY_DELAY_MILLIS));
+      _retryPolicy = new FixedDelayRetryPolicy(numRetries, retryDelayMs);
     } catch (Exception e) {
       _kinesisClient = null;
     }
   }
 
   @Override
   public void produce(String topic, byte[] payload) {
-    PutRecordRequest putRecordRequest =
-        PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
-            .partitionKey(UUID.randomUUID().toString()).build();
-    PutRecordResponse putRecordResponse = _kinesisClient.putRecord(putRecordRequest);
+    try {
+      _retryPolicy.attempt(() -> putRecord(topic, null, payload));
+    } catch (Exception e) {
+      LOGGER.error("Retries exhausted while pushing record in stream {}", topic);

Review Comment:
   ty! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps commented on PR #8609:
URL: https://github.com/apache/pinot/pull/8609#issuecomment-1112454699

   @navina 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps merged pull request #8609: Add support for Retry in Kinesis Stream producer

Posted by GitBox <gi...@apache.org>.
KKcorps merged PR #8609:
URL: https://github.com/apache/pinot/pull/8609


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org