You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2021/06/04 01:52:31 UTC

[kafka] branch trunk updated: minor stylish fixes to raft client (#10809)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0358c21  minor stylish fixes to raft client (#10809)
0358c21 is described below

commit 0358c21ae4d78c2f035f5d0fb1ed087611ec2ba5
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Jun 3 18:51:03 2021 -0700

    minor stylish fixes to raft client (#10809)
    
    Style fixes to KafkaRaftClient
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 52 +++++++++-------------
 1 file changed, 21 insertions(+), 31 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index e0e7cb4..f004203 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -309,17 +309,15 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         for (ListenerContext listenerContext : listenerContexts) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
                 if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
-                    SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> {
-                        return new IllegalStateException(
-                            String.format(
-                                "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
-                                listenerContext.listener.getClass().getTypeName(),
-                                nextExpectedOffset,
-                                log.startOffset(),
-                                highWatermark
-                            )
-                        );
-                    });
+                    SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
+                        String.format(
+                            "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
+                            listenerContext.listener.getClass().getTypeName(),
+                            nextExpectedOffset,
+                            log.startOffset(),
+                            highWatermark
+                        )
+                    ));
                     listenerContext.fireHandleSnapshot(snapshot);
                 }
             });
@@ -347,14 +345,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> records) {
         for (ListenerContext listenerContext : listenerContexts) {
             OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset();
-            if (!nextExpectedOffsetOpt.isPresent()) {
-                continue;
-            }
-
-            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
-            if (nextExpectedOffset == baseOffset) {
-                listenerContext.fireHandleCommit(baseOffset, epoch, records);
-            }
+            nextExpectedOffsetOpt.ifPresent(nextOffset -> {
+                if (nextOffset == baseOffset)
+                    listenerContext.fireHandleCommit(baseOffset, epoch, records);
+            });
         }
     }
 
@@ -388,7 +382,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             if (quorum.isVoter()
                 && quorum.remoteVoters().isEmpty()
                 && !quorum.isCandidate()) {
-
                 transitionToCandidate(currentTimeMs);
             }
         } catch (IOException e) {
@@ -449,7 +442,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
-        // We update the end offset before flushing so that parked fetches can return sooner
+        // We update the end offset before flushing so that parked fetches can return sooner.
         updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
         log.flush();
     }
@@ -502,11 +495,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         resetConnections();
 
         // After becoming a follower, we need to complete all pending fetches so that
-        // they can be resent to the leader without waiting for their expiration
+        // they can be re-sent to the leader without waiting for their expirations
         fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
             "Cannot process the fetch request because the node is no longer the leader."));
 
-        // Clearing the append purgatory should complete all future exceptionally since this node is no longer the leader
+        // Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader
         appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
             "Failed to receive sufficient acknowledgments for this append before leader change."));
     }
@@ -552,7 +545,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         }
 
         if (!hasValidTopicPartition(request, log.topicPartition())) {
-            // Until we support multi-raft, we treat topic partition mismatches as invalid requests
+            // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests
             return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
         }
 
@@ -638,7 +631,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                             binaryExponentialElectionBackoffMs(state.retries())
                         );
                     }
-
                 }
             } else {
                 logger.debug("Ignoring vote response {} since we are no longer a candidate in epoch {}",
@@ -1072,7 +1064,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             FetchResponseData.EpochEndOffset divergingEpoch = partitionResponse.divergingEpoch();
             if (divergingEpoch.epoch() >= 0) {
                 // The leader is asking us to truncate before continuing
-                OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
+                final OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
                     divergingEpoch.endOffset(), divergingEpoch.epoch());
 
                 state.highWatermark().ifPresent(highWatermark -> {
@@ -1104,7 +1096,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                     );
                     return false;
                 } else {
-                    OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+                    final OffsetAndEpoch snapshotId = new OffsetAndEpoch(
                         partitionResponse.snapshotId().endOffset(),
                         partitionResponse.snapshotId().epoch()
                     );
@@ -1193,7 +1185,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
      */
     private FetchSnapshotResponseData handleFetchSnapshotRequest(
         RaftRequest.Inbound requestMetadata
-    ) throws IOException {
+    ) {
         FetchSnapshotRequestData data = (FetchSnapshotRequestData) requestMetadata.data;
 
         if (!hasValidClusterId(data.clusterId())) {
@@ -2038,7 +2030,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         }
     }
 
-    private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) throws IOException {
+    private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) {
         final Supplier<ApiMessage> requestSupplier;
 
         if (state.fetchingSnapshot().isPresent()) {
@@ -2465,7 +2457,5 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                 wakeup();
             }
         }
-
     }
-
 }