You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "shekhars-li (via GitHub)" <gi...@apache.org> on 2023/01/24 22:38:04 UTC

[GitHub] [samza] shekhars-li opened a new pull request, #1650: Make blob store related retry tunables configurable

shekhars-li opened a new pull request, #1650:
URL: https://github.com/apache/samza/pull/1650

   What:
   - Moved tunables related to blob store retry logic behind configuration.
   - Tunables and their new configuration are:  
       - Max retries -> "blob.store.retry.policy.max.retries"
       - Max retry duration -> "blob.store.retry.policy.max.retries.duration.ms"
       - Backoff delay -> "blob.store.retry.policy.backoff.delay.mllis"
       - Max backoff delay -> "blob.store.retry.policy.backoff.max.delay.mllis"
       - Backoff delay factor -> "blob.store.retry.policy.backoff.delay.factor"
   
   Why:
   - These tunables were set to constant values and did not allow us to modify them to tune jobs for better failure resilience. 
   
   Tests:
   - Updated unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086054327


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -23,19 +23,56 @@
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
+  
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String DEFAULT_BLOB_STORE_MANAGER_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreManagerFactory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  public static final String DEFAULT_BLOB_STORE_ADMIN_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreAdminFactory";
+  // Configs related to retry policy of blob stores
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES = PREFIX + "retry.policy.max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_BLOB_STORE_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES_DURATION_MILLIS = PREFIX + "retry.policy.max.retires.duration.millis";

Review Comment:
   Should also be max retry duration (not retries).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086025969


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -23,19 +23,56 @@
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
+  
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String DEFAULT_BLOB_STORE_MANAGER_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreManagerFactory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  public static final String DEFAULT_BLOB_STORE_ADMIN_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreAdminFactory";
+  // Configs related to retry policy of blob stores
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES = PREFIX + "retry.policy.max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_BLOB_STORE_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES_DURATION_MILLIS = PREFIX + "retry.policy.max.retires.duration.millis";

Review Comment:
   Declare a new variable RETRY_POLICY_PREFIX = PREFIX + "retry.policy." instead of duplicating boilerplate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092592037


##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -82,17 +86,21 @@ public class BlobStoreUtil {
 
   private final BlobStoreManager blobStoreManager;
   private final ExecutorService executor;
+  private final BlobStoreConfig blobStoreConfig;
   private final BlobStoreBackupManagerMetrics backupMetrics;
   private final BlobStoreRestoreManagerMetrics restoreMetrics;
   private final SnapshotIndexSerde snapshotIndexSerde;
+  private final RetryPolicy<Object> retryPolicy;
 
-  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
+  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, BlobStoreConfig blobStoreConfig,
       BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) {
     this.blobStoreManager = blobStoreManager;
     this.executor = executor;
+    this.blobStoreConfig = blobStoreConfig;
     this.backupMetrics = backupMetrics;
     this.restoreMetrics = restoreMetrics;
     this.snapshotIndexSerde = new SnapshotIndexSerde();
+    this.retryPolicy = buildRetryPolicyFromConfig();

Review Comment:
   Don't access class fields set up in constructor from methods called in constructor. This is prone to breaking if field assignements are reordered. Pass variable explicitly. Generally a good practice (e.g. for unit testing the method.).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092689434


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -19,23 +19,75 @@
 
 package org.apache.samza.config;
 
+import java.time.temporal.ChronoUnit;
+import org.apache.samza.util.RetryPolicyConfig;
+
+
 /**
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 15 * 60 * 1000; // 15 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;

Review Comment:
   This is a little over 5 mins - the retries with backoff should result in 10 mins retry with this max retry delay (for the last retry).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086022930


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -23,19 +23,56 @@
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
+  
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String DEFAULT_BLOB_STORE_MANAGER_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreManagerFactory";

Review Comment:
   Remove LI internal (Ambry) details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092597716


##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -629,4 +637,15 @@ private static Predicate<Throwable> isCauseNonRetriable() {
       return unwrapped != null && !RetriableException.class.isAssignableFrom(unwrapped.getClass());
     };
   }
+
+  private RetryPolicy<Object> buildRetryPolicyFromConfig() {
+    RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
+        .withMaxRetries(blobStoreConfig.getRetryPolicyMaxRetries())
+        .withBackoff(blobStoreConfig.getRetryPolicyBackoffDelayMillis(),
+            blobStoreConfig.getRetryPolicyBackoffMaxDelayMillis(), ChronoUnit.MILLIS,
+            blobStoreConfig.getRetryPolicyBackoffDelayFactor())
+        .withMaxDuration(Duration.ofMinutes(blobStoreConfig.getRetryPolicyMaxRetriesDurationMillis()))

Review Comment:
   Minor: Why convert this to minutes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086022385


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,16 +144,16 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor) {
-    Duration maxDuration = Duration.ofMinutes(10);
-
-    RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
-        .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
-        .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
-        .withMaxDuration(maxDuration)
-        .abortOn(abortRetries) // stop retrying if predicate returns true
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
+      ExecutorService executor, BlobStoreConfig blobStoreConfig) {

Review Comment:
   Use-case-agnostic util class should not take use case specific parameters (blob store config).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092601390


##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -629,4 +637,15 @@ private static Predicate<Throwable> isCauseNonRetriable() {
       return unwrapped != null && !RetriableException.class.isAssignableFrom(unwrapped.getClass());
     };
   }
+
+  private RetryPolicy<Object> buildRetryPolicyFromConfig() {

Review Comment:
   Consider moving this method to BlobStoreConfigs and returning a RetryPolicy/RetryPolicyConfig directly (i.e. instead of returning the configs individually through multiple accessors and constructing this object later).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092590749


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
+  public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
+  public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
+  public static final String RETRY_POLICY_JITTER_FACTOR_MILLIS =  RETRY_POLICY_PREFIX + "jitter.factor.millis";
+  // random retry delay between -100 to 100 millisecond
+  public static final long DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS = 100;

Review Comment:
   This is factor, not amount, right? Is this defaulting to a factor of 100?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092592289


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
+  public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
+  public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
+  public static final String RETRY_POLICY_JITTER_FACTOR_MILLIS =  RETRY_POLICY_PREFIX + "jitter.factor.millis";
+  // random retry delay between -100 to 100 millisecond
+  public static final long DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS = 100;

Review Comment:
   This is actually amount -> e.g. default of 100 ms. Should be names just jitter instead of jitter factor. Updating it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092647543


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -144,31 +141,21 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
       Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor) {
-    Duration maxDuration = Duration.ofMinutes(10);
+      ExecutorService executor, RetryPolicyConfig retryPolicyConfig) {
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
-        .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
-        .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
-        .withMaxDuration(maxDuration)
-        .withJitter(Duration.ofMillis(100))
-        .abortOn(abortRetries) // stop retrying if predicate returns true
-        .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
-            opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
-
-    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
-  }
-
-  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
-
-    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)
+        .withMaxRetries(retryPolicyConfig.getMaxRetries())

Review Comment:
   Nvm, saw below that this is a custom class. Looks like there is an existing RetryPolicyConfig class in Failsafe [1]. Might only be in a newer version. Check if it's possible to upgrade. Otherwise LGTM for time being.
   
   [1] https://failsafe.dev/javadoc/core/dev/failsafe/RetryPolicy.html#builder-dev.failsafe.RetryPolicyConfig-



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092599490


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
+  public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
+  public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
+  public static final String RETRY_POLICY_JITTER_FACTOR_MILLIS =  RETRY_POLICY_PREFIX + "jitter.factor.millis";
+  // random retry delay between -100 to 100 millisecond
+  public static final long DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS = 100;

Review Comment:
   Prefer using factor instead of fixed (inlike the current actual impl), since it scales better with longer retry duration. E.g. a facor of 0.1 - 0.2 works even at a retry interval of ~5 min.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092595608


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
       ExecutorService executor) {
     Duration maxDuration = Duration.ofMinutes(10);
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
         .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
         .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
         .withMaxDuration(maxDuration)
+        .withJitter(Duration.ofMillis(100))
         .abortOn(abortRetries) // stop retrying if predicate returns true
         .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
             opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
 
-    return Failsafe.with(retryPolicy).with(executor).getStageAsync(action::get);
+    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
+  }
+
+  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
+      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
+
+    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)

Review Comment:
   Does this mutate the existing retry policy or create a new one?



##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
       ExecutorService executor) {
     Duration maxDuration = Duration.ofMinutes(10);
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
         .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
         .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
         .withMaxDuration(maxDuration)
+        .withJitter(Duration.ofMillis(100))
         .abortOn(abortRetries) // stop retrying if predicate returns true
         .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
             opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
 
-    return Failsafe.with(retryPolicy).with(executor).getStageAsync(action::get);
+    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
+  }
+
+  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
+      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
+
+    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)

Review Comment:
   Does this mutate the existing retry policy or create and return a new one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092597786


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
       ExecutorService executor) {
     Duration maxDuration = Duration.ofMinutes(10);
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
         .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
         .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
         .withMaxDuration(maxDuration)
+        .withJitter(Duration.ofMillis(100))
         .abortOn(abortRetries) // stop retrying if predicate returns true
         .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
             opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
 
-    return Failsafe.with(retryPolicy).with(executor).getStageAsync(action::get);
+    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
+  }
+
+  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
+      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
+
+    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)

Review Comment:
   It mutates the existing retry policy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092601277


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins

Review Comment:
   Thanks, updating it to 15 mins. Given max delay of 5 mins, that should take > 10 mins to exhaust all retries. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092654460


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -144,31 +141,21 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
       Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor) {
-    Duration maxDuration = Duration.ofMinutes(10);
+      ExecutorService executor, RetryPolicyConfig retryPolicyConfig) {
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
-        .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
-        .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
-        .withMaxDuration(maxDuration)
-        .withJitter(Duration.ofMillis(100))
-        .abortOn(abortRetries) // stop retrying if predicate returns true
-        .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
-            opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
-
-    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
-  }
-
-  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
-
-    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)
+        .withMaxRetries(retryPolicyConfig.getMaxRetries())

Review Comment:
   Yes, RetryPolicyConfig is not available yet in the [latest release version](https://repo.maven.apache.org/maven2/net/jodah/failsafe/2.4.4/) of FailSafe library. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086024120


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -23,19 +23,56 @@
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
+  
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String DEFAULT_BLOB_STORE_MANAGER_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreManagerFactory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  public static final String DEFAULT_BLOB_STORE_ADMIN_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreAdminFactory";
+  // Configs related to retry policy of blob stores
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES = PREFIX + "retry.policy.max.retries";

Review Comment:
   This class is BlobStoreConfig. Can remove "blob store" prefix from variable names and accesors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092643747


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -76,7 +80,14 @@ public int getRetryPolicyBackoffDelayFactor() {
     return getInt(RETRY_POLICY_BACKOFF_DELAY_FACTOR, DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR);
   }
 
-  public long getRetryPolicyJitterFactorMillis() {
-    return getLong(RETRY_POLICY_JITTER_FACTOR_MILLIS, DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS);
+  public double getRetryPolicyJitterFactor() {

Review Comment:
   Remoe unnecessary accessor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] shekhars-li commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "shekhars-li (via GitHub)" <gi...@apache.org>.
shekhars-li commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092596582


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,

Review Comment:
   Not used anymore. Left it, in case someone wanted to use the default retry policy. Removing it for now



##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -82,17 +86,21 @@ public class BlobStoreUtil {
 
   private final BlobStoreManager blobStoreManager;
   private final ExecutorService executor;
+  private final BlobStoreConfig blobStoreConfig;
   private final BlobStoreBackupManagerMetrics backupMetrics;
   private final BlobStoreRestoreManagerMetrics restoreMetrics;
   private final SnapshotIndexSerde snapshotIndexSerde;
+  private final RetryPolicy<Object> retryPolicy;
 
-  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
+  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, BlobStoreConfig blobStoreConfig,
       BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) {
     this.blobStoreManager = blobStoreManager;
     this.executor = executor;
+    this.blobStoreConfig = blobStoreConfig;
     this.backupMetrics = backupMetrics;
     this.restoreMetrics = restoreMetrics;
     this.snapshotIndexSerde = new SnapshotIndexSerde();
+    this.retryPolicy = buildRetryPolicyFromConfig();

Review Comment:
   Thank you! Updating it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092598919


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins

Review Comment:
   Let's change this to > 10 min. IIRC the current policy defaults only try up to 6-ish minutes and the last attempt exceeds this 10 min max.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm merged pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm merged PR #1650:
URL: https://github.com/apache/samza/pull/1650


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092604327


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
       ExecutorService executor) {
     Duration maxDuration = Duration.ofMinutes(10);
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
         .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
         .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
         .withMaxDuration(maxDuration)
+        .withJitter(Duration.ofMillis(100))
         .abortOn(abortRetries) // stop retrying if predicate returns true
         .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
             opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
 
-    return Failsafe.with(retryPolicy).with(executor).getStageAsync(action::get);
+    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
+  }
+
+  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
+      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
+
+    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)

Review Comment:
   Looking at the impl, RetryPolicy is a mutable object, this is not safe to do. Please fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] dxichen commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "dxichen (via GitHub)" <gi...@apache.org>.
dxichen commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092650718


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -19,23 +19,75 @@
 
 package org.apache.samza.config;
 
+import java.time.temporal.ChronoUnit;
+import org.apache.samza.util.RetryPolicyConfig;
+
+
 /**
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 15 * 60 * 1000; // 15 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;

Review Comment:
   Out of curiosity why did we choose this number? Maybe some documentation on how we arrived at this



##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -19,23 +19,55 @@
 
 package org.apache.samza.config;
 
+import java.time.temporal.ChronoUnit;
+import org.apache.samza.util.RetryPolicyConfig;
+
+
 /**
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 15 * 60 * 1000; // 15 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
+  public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
+  public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
+  public static final String RETRY_POLICY_JITTER_FACTOR =  RETRY_POLICY_PREFIX + "jitter.factor";

Review Comment:
   Please config also add this in pr description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092643587


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -33,16 +37,16 @@ public class BlobStoreConfig extends MapConfig {
   // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
   public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
   public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
-  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 15 * 60 * 1000; // 15 mins
   public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
   public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
   public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
   public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
   public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
   public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
-  public static final String RETRY_POLICY_JITTER_FACTOR_MILLIS =  RETRY_POLICY_PREFIX + "jitter.factor.millis";
+  public static final String RETRY_POLICY_JITTER_FACTOR =  RETRY_POLICY_PREFIX + "jitter.factor";
   // random retry delay between -100 to 100 millisecond
-  public static final long DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS = 100;
+  public static final double DEFAULT_RETRY_POLICY_JITTER_FACTOR = 0.1;

Review Comment:
   Fix comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092590952


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -24,18 +24,59 @@
  */
 public class BlobStoreConfig extends MapConfig {
 
-  public static final String BLOB_STORE_MANAGER_FACTORY = "blob.store.manager.factory";
-  public static final String BLOB_STORE_ADMIN_FACTORY = "blob.store.admin.factory";
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  // Configs related to retry policy of blob stores
+  private static final String RETRY_POLICY_PREFIX = PREFIX + "retry.policy.";
+  public static final String RETRY_POLICY_MAX_RETRIES = RETRY_POLICY_PREFIX + "max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = RETRY_POLICY_PREFIX + "max.retry.duration.millis";
+  public static final long DEFAULT_RETRY_POLICY_MAX_RETRY_DURATION_MILLIS = 10 * 60 * 1000; // 10 mins
+  public static final String RETRY_POLICY_BACKOFF_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_DELAY_MILLIS = 100;
+  public static final String RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = RETRY_POLICY_PREFIX + "backoff.max.delay.millis";
+  public static final long DEFAULT_RETRY_POLICY_BACKOFF_MAX_DELAY_MILLIS = 312500;
+  public static final String RETRY_POLICY_BACKOFF_DELAY_FACTOR = RETRY_POLICY_PREFIX + "backoff.delay.factor";
+  public static final int DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR = 5;
+  public static final String RETRY_POLICY_JITTER_FACTOR_MILLIS =  RETRY_POLICY_PREFIX + "jitter.factor.millis";
+  // random retry delay between -100 to 100 millisecond
+  public static final long DEFAULT_RETRY_POLICY_JITTER_FACTOR_MILLIS = 100;

Review Comment:
   Also fix comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092592037


##########
samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java:
##########
@@ -82,17 +86,21 @@ public class BlobStoreUtil {
 
   private final BlobStoreManager blobStoreManager;
   private final ExecutorService executor;
+  private final BlobStoreConfig blobStoreConfig;
   private final BlobStoreBackupManagerMetrics backupMetrics;
   private final BlobStoreRestoreManagerMetrics restoreMetrics;
   private final SnapshotIndexSerde snapshotIndexSerde;
+  private final RetryPolicy<Object> retryPolicy;
 
-  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
+  public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, BlobStoreConfig blobStoreConfig,
       BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) {
     this.blobStoreManager = blobStoreManager;
     this.executor = executor;
+    this.blobStoreConfig = blobStoreConfig;
     this.backupMetrics = backupMetrics;
     this.restoreMetrics = restoreMetrics;
     this.snapshotIndexSerde = new SnapshotIndexSerde();
+    this.retryPolicy = buildRetryPolicyFromConfig();

Review Comment:
   Don't access class fields set up in constructor from methods called in constructor. This is prone to breaking if field assignments are reordered. Pass local variable explicitly. It is also generally a good practice (e.g. for unit testing the method).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092595234


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -143,20 +143,32 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
   }
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action,
-      Predicate<? extends Throwable> abortRetries,
+      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,

Review Comment:
   Is this method still used somewhere? If not, remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1092645895


##########
samza-core/src/main/java/org/apache/samza/util/FutureUtil.java:
##########
@@ -144,31 +141,21 @@ public static <K, V> CompletableFuture<Map<K, V>> toFutureOfMap(
 
   public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
       Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor) {
-    Duration maxDuration = Duration.ofMinutes(10);
+      ExecutorService executor, RetryPolicyConfig retryPolicyConfig) {
 
     RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
-        .withMaxRetries(-1) // Sets maximum retry to unlimited from default of 3 attempts. Retries are now limited by max duration and not retry counts.
-        .withBackoff(100, 312500, ChronoUnit.MILLIS, 5) // 100 ms, 500 ms, 2500 ms, 12.5 s, 1.05 min, 5.20 min, 5.20 min
-        .withMaxDuration(maxDuration)
-        .withJitter(Duration.ofMillis(100))
-        .abortOn(abortRetries) // stop retrying if predicate returns true
-        .onRetry(e -> LOG.warn("Action: {} attempt: {} completed with error {} ms after start. Retrying up to {} ms.",
-            opName, e.getAttemptCount(), e.getElapsedTime().toMillis(), maxDuration.toMillis(), e.getLastFailure()));
-
-    return executeAsyncWithRetries(opName, action, abortRetries, executor, retryPolicy);
-  }
-
-  public static <T> CompletableFuture<T> executeAsyncWithRetries(String opName,
-      Supplier<? extends CompletionStage<T>> action, Predicate<? extends Throwable> abortRetries,
-      ExecutorService executor, RetryPolicy<Object> retryPolicy) {
-
-    RetryPolicy<Object> retryPolicyWithLog = retryPolicy.abortOn(abortRetries)
+        .withMaxRetries(retryPolicyConfig.getMaxRetries())

Review Comment:
   Doesn't seem right. If RetryPolicyConfig exists, is there a better way to build the RetryPolicy than copy-setting every property? E.g. with a constructor that takes the config, or a builder that accepts the config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1650: Make blob store related retry tunables configurable

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1650:
URL: https://github.com/apache/samza/pull/1650#discussion_r1086024392


##########
samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java:
##########
@@ -23,19 +23,56 @@
  * Config related helper methods for BlobStore.
  */
 public class BlobStoreConfig extends MapConfig {
+  
+  private static final String PREFIX = "blob.store.";
+  public static final String BLOB_STORE_MANAGER_FACTORY = PREFIX + "manager.factory";
+  public static final String DEFAULT_BLOB_STORE_MANAGER_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreManagerFactory";
+  public static final String BLOB_STORE_ADMIN_FACTORY = PREFIX + "admin.factory";
+  public static final String DEFAULT_BLOB_STORE_ADMIN_FACTORY = "com.linkedin.samza.ambry.AmbryBlobStoreAdminFactory";
+  // Configs related to retry policy of blob stores
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES = PREFIX + "retry.policy.max.retries";
+  // -1 for RetryPolicy means unlimited retries. Retry is limited by max retry duration, rather than count of retries.
+  public static final int DEFAULT_BLOB_STORE_RETRY_POLICY_MAX_RETRIES = -1;
+  public static final String BLOB_STORE_RETRY_POLICY_MAX_RETRIES_DURATION_MILLIS = PREFIX + "retry.policy.max.retires.duration.millis";

Review Comment:
   Typo, s/retires/retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org