You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/12 13:13:50 UTC

[GitHub] [flink] zoltar9264 opened a new pull request, #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

zoltar9264 opened a new pull request, #19441:
URL: https://github.com/apache/flink/pull/19441

   ## What is the purpose of the change
   
   As [FLINK-27187](https://issues.apache.org/jira/browse/FLINK-27187) discussed, add changelog storage metric "totalAttemptsPerUpload" representing the total number distributions of attempts per upload. 
   
   ## Brief change log
   
     - add totalAttemptsPerUpload to ChangelogStorageMetricGroup
     - RetryingExecutor record total attempts and update totalAttemptsPerUpload with it
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - *Added test ChangelogStorageMetricGroup.testTotalAttemptsPerUpload() that validates that totalAttemptsPerUpload calculate correctly*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r856152839


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }
+            }
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                try {
+                    remainingAttempts.await();

Review Comment:
   You're right, completing first and last attempts sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854735013


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Got it. I have changed the implementation of testTotalAttemptsPerUpload, and updated the pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r855760227


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }
+            }
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                try {
+                    remainingAttempts.await();

Review Comment:
   If last attempt throw an exception, the upload may failed, so I make first and last attempt be waiting , and others throw an exception. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan merged pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan merged PR #19441:
URL: https://github.com/apache/flink/pull/19441


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19441:
URL: https://github.com/apache/flink/pull/19441#issuecomment-1096717465

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eec54f3f0d7c6eb33d0bc1d22910e1857bf1ee23",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eec54f3f0d7c6eb33d0bc1d22910e1857bf1ee23",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eec54f3f0d7c6eb33d0bc1d22910e1857bf1ee23 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854958316


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854002057


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   I mean `MaxAttemptUploader` only have one behavior, just like you said, make first attempt waiting until all others attempt triggered.
   Then both testAttemptsPerUpload and testTotalAttemptsPerUpload can use `MaxAttemptUploader`. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r852664044


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Could you explain why `sleep` is needed here and why an existing `MaxAttemptUploader` can not be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r853922622


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Thanks for your nice suggestion @rkhachatryan .
   
   First, yes, testTotalAttemptsPerUpload completes the first attempt.
   
   Second, I agree with keep these tests separate.
   
   Third, the implementation of testTotalAttemptsPerUpload, I think you are right.
   And I think we can implement waiting in 1st attempt for the others in MaxAttemptUploader. What do you think about ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r853964273


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Regarding `MaxAttemptUploader`, do you mean it would have two types of behavior?
   I think this could make it less readable, but probably you have a better idea on how to do it, so please go ahead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854959208


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }

Review Comment:
   NIT: can be replaced with
   ```
   remainingAttemptsPerTask
           .computeIfAbsent(uploadTask, ign -> new CountDownLatch(maxAttempts - 1))
           .countDown();
   ```



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }
+            }
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                try {
+                    remainingAttempts.await();

Review Comment:
   I was thinking that only the 1st attempt should be waiting, all the other should fail by throwing an exception. That would more likely catch any bugs.
   WDYT?



##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }
+            }
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                try {
+                    remainingAttempts.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();

Review Comment:
   Should we throw an exception here (in addition to thread.interrupt)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r855755359


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }

Review Comment:
   Thanks @rkhachatryan , nice tip.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on PR #19441:
URL: https://github.com/apache/flink/pull/19441#issuecomment-1104916495

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854012871


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   In that case, `testAttemptsPerUpload` would assert that `metrics.AttemptsPerUpload == 1`, right?
   That doesn't seem very useful because 1 is likely the number of attempts in any case, even without retries, so it wouldn't catch many bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on PR #19441:
URL: https://github.com/apache/flink/pull/19441#issuecomment-1106451265

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r853867505


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   Thanks for the explanation. 
   In other words, `testAttemptsPerUpload` completes the last attempt and `testTotalAttemptsPerUpload` completes the first attempt, right?
   
   IMO it's better to keep these tests separate, because the scenarios and the assertions are different.
   
   As for the implementation of `testTotalAttemptsPerUpload`, how about waiting in the 1st attempt for the others to fail?
   Something like `CountDownLatch` or `CompletableFuture` + `AtomicInteger` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r854959208


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +346,55 @@ public void close() {
         }
     }
 
+    private static class WaitingMaxAttemptUploader implements StateChangeUploader {
+        private final ConcurrentHashMap<UploadTask, CountDownLatch> remainingAttemptsPerTask;
+        private final int maxAttempts;
+
+        public WaitingMaxAttemptUploader(int maxAttempts) {
+            if (maxAttempts < 1) {
+                throw new IllegalArgumentException("maxAttempts < 0");
+            }
+            this.maxAttempts = maxAttempts;
+            this.remainingAttemptsPerTask = new ConcurrentHashMap<>();
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+
+            for (UploadTask uploadTask : tasks) {
+                CountDownLatch remainingAttempts = remainingAttemptsPerTask.get(uploadTask);
+                if (remainingAttempts == null) {
+                    remainingAttempts = new CountDownLatch(maxAttempts - 1);
+                    remainingAttemptsPerTask.put(uploadTask, remainingAttempts);
+                } else {
+                    remainingAttempts.countDown();
+                }

Review Comment:
   NIT: can be replaced with
   ```
   remainingAttemptsPerTask
           .computeIfAbsent(uploadTask, ign -> new CountDownLatch(maxAttempts))
           .countDown();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zoltar9264 commented on a diff in pull request #19441: [FLINK-27187][state/changelog] Add changelog storage metric totalAttemptsPerUpload

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on code in PR #19441:
URL: https://github.com/apache/flink/pull/19441#discussion_r853687720


##########
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java:
##########
@@ -295,6 +349,39 @@ public void close() {
         }
     }
 
+    private static class FixedLatencyUploader implements StateChangeUploader {
+        private final long latency;
+
+        public FixedLatencyUploader(long latency) {
+            this.latency = latency;
+        }
+
+        @Override
+        public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+
+            try {
+                TimeUnit.MILLISECONDS.sleep(latency);

Review Comment:
   I want to simulate timeout situation instead of IOException. The difference between the two is that when MaxAttemptUploader is used, attemptNumber must be equal to totalAttempt, while when FixedLatencyUploader is used, attemptNumber is less than totalAttempt. First attempt will success with FixedLatencyUploader.
   
   In fact, I considered merging testAttemptsPerUpload and testTotalAttemptsPerUpload together. This can be  implemented by adding the following to testTotalAttemptsPerUpload:
   
   `
           HistogramStatistics attemptsPerUploadHistogram =
                   metrics.getAttemptsPerUpload().getStatistics();
           assertThat(attemptsPerUploadHistogram.getMin()).isEqualTo(1);
           assertThat(attemptsPerUploadHistogram.getMax()).isEqualTo(1);
   `
   
   How do you think about @rkhachatryan ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org