You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 03:08:14 UTC

[pulsar] branch master updated: [cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 62e2a9abd02 [cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)
62e2a9abd02 is described below

commit 62e2a9abd02e99a81e6d4811046f4799f027ce7a
Author: Elliot West <el...@streamnative.io>
AuthorDate: Thu Aug 11 04:08:08 2022 +0100

    [cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)
    
    ### Motivation
    
    To reduce warnings generated during compilation by cleaning up the code.
    
    ```
    [WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[334,9] finally clause cannot complete normally
    [WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[642,9] finally clause cannot complete normally
    ```
    
    ### Modifications
    
    * Moved `return` from `finally` blocks
    * Simplified code through the use of simpler, equivalent language constructs
    * Made some constant fields `final`
    
    Co-authored-by: Matteo Merli <mm...@apache.org>
---
 .../pulsar/functions/worker/SchedulerManager.java  | 114 ++++++++-------------
 1 file changed, 43 insertions(+), 71 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 79712398bef..af633ca2e9b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
@@ -31,6 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -61,6 +63,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.proto.Function;
@@ -71,7 +74,6 @@ import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
-@Slf4j
 /**
  * The scheduler manager is used to compute scheduling of function instances
  * Only the leader computes new schedulings and writes assignments to the assignment topic
@@ -80,6 +82,7 @@ import org.apache.pulsar.functions.worker.scheduler.IScheduler;
  *  2. When worker loses leadership, this class will be closed which
  *  also closes the worker's producer to the assignments topic
  */
+@Slf4j
 public class SchedulerManager implements AutoCloseable {
 
     private final WorkerConfig workerConfig;
@@ -109,7 +112,7 @@ public class SchedulerManager implements AutoCloseable {
     private final PulsarAdmin admin;
 
     @Getter
-    private Lock schedulerLock = new ReentrantLock(true);
+    private final Lock schedulerLock = new ReentrantLock(true);
 
     private volatile boolean isRunning = false;
 
@@ -123,11 +126,9 @@ public class SchedulerManager implements AutoCloseable {
 
     private MessageId metadataTopicLastMessage = MessageId.earliest;
 
-    private AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
-    private Future<?> currentRebalanceFuture;
+    private final AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
 
-    private AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
-    private Future<?> currentDrainFuture;
+    private final AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
     // The list of assignments moved due to the last drain op on a leader. Used in UTs, and debugging.
     private List<Assignment> assignmentsMovedInLastDrain;
 
@@ -136,13 +137,13 @@ public class SchedulerManager implements AutoCloseable {
         DrainNotInProgress,
         DrainInProgress,
         DrainCompleted
-    };
+    }
 
     // A map to hold the status of recent drain operations.
     // It is of the form {workerId : DrainOpStatus}.
     // Entries are added when a drain operation starts, and removed on a periodic (when the worker is no longer seen
     // on a poll).
-    private ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<String, DrainOpStatus>();
+    private final ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<>();
 
     public SchedulerManager(WorkerConfig workerConfig,
                             PulsarClient pulsarClient,
@@ -248,8 +249,7 @@ public class SchedulerManager implements AutoCloseable {
                 rebalanceInProgress.set(false);
                 throw new TooFewWorkersException();
             }
-            currentRebalanceFuture = rebalance();
-            return currentRebalanceFuture;
+            return rebalance();
         } else {
             throw new RebalanceInProgressException();
         }
@@ -289,8 +289,7 @@ public class SchedulerManager implements AutoCloseable {
                     throw new UnknownWorkerException();
                 }
 
-                currentDrainFuture = drain(workerId);
-                return currentDrainFuture;
+                return drain(workerId);
             } finally {
                 drainInProgressFlag.set(false);
             }
@@ -301,37 +300,25 @@ public class SchedulerManager implements AutoCloseable {
 
     public LongRunningProcessStatus getDrainStatus(String workerId) {
         long startTime = System.nanoTime();
-        String errString;
-        LongRunningProcessStatus retVal = new LongRunningProcessStatus();
-        try {
-            val workerStatus = drainOpStatusMap.get(workerId);
-            if (workerStatus == null) {
-                errString = "Worker " + workerId + " not found in drain records";
-                retVal = LongRunningProcessStatus.forError(errString);
-            } else {
-                switch (workerStatus) {
-                    default:
-                        errString =
-                                "getDrainStatus: Unexpected status " + workerStatus + " found for worker " + workerId;
-                        retVal = LongRunningProcessStatus.forError(errString);
-                        break;
-                    case DrainCompleted:
-                        retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
-                        break;
-                    case DrainInProgress:
-                        retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
-                        break;
-                    case DrainNotInProgress:
-                        retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
-                        break;
-                }
-            }
-        } finally {
-            log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
-                    workerId, (System.nanoTime() - startTime) / Math.pow(10, 9),
-                    retVal.status, retVal.lastError);
-            return retVal;
-        }
+        LongRunningProcessStatus status = Optional.ofNullable(workerId).map(id ->
+                Optional.ofNullable(drainOpStatusMap.get(id)).map(opStatus ->
+                        switch (opStatus) {
+                            case DrainCompleted ->
+                                    LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                            case DrainInProgress ->
+                                    LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+                            case DrainNotInProgress ->
+                                    LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                        }).orElse(
+                        LongRunningProcessStatus.forError("Worker " + id + " not found in drain records")
+                )
+        ).orElse(
+                new LongRunningProcessStatus()
+        );
+        log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
+                workerId, NANOSECONDS.toSeconds (System.nanoTime() - startTime),
+                status.status, status.lastError);
+        return status;
     }
 
     // The following method is used only for testing.
@@ -351,8 +338,7 @@ public class SchedulerManager implements AutoCloseable {
     // The following method is used only for testing.
     @VisibleForTesting
     ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
-        val retVal = new ConcurrentHashMap<String, DrainOpStatus>(drainOpStatusMap);
-        return retVal;
+        return new ConcurrentHashMap<>(drainOpStatusMap);
     }
 
     private synchronized int getCurrentAvailableNumWorkers() {
@@ -361,16 +347,10 @@ public class SchedulerManager implements AutoCloseable {
 
     private synchronized Set<String> getCurrentAvailableWorkers() {
         Set<String> currentMembership = membershipManager.getCurrentMembership()
-                .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
+                .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());
 
         // iterate the set, instead of the concurrent hashmap
-        Iterator<String> iter = currentMembership.iterator();
-        while (iter.hasNext()) {
-            if (drainOpStatusMap.containsKey(iter.next())) {
-                iter.remove();
-            }
-        }
-
+        currentMembership.removeIf(drainOpStatusMap::containsKey);
         return currentMembership;
     }
 
@@ -445,11 +425,7 @@ public class SchedulerManager implements AutoCloseable {
                     String workerId = workerIdToAssignmentEntry.getKey();
                     // remove assignments to workers that don't exist / died for now.
                     // wait for failure detector to unassign them in the future for re-scheduling
-                    if (!availableWorkers.contains(workerId)) {
-                        return false;
-                    }
-
-                    return true;
+                    return availableWorkers.contains(workerId);
                 })
                 .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
                 .collect(Collectors.toList());
@@ -505,11 +481,7 @@ public class SchedulerManager implements AutoCloseable {
                     String workerId = workerIdToAssignmentEntry.getKey();
                     // remove assignments to workers that don't exist / died for now.
                     // wait for failure detector to unassign them in the future for re-scheduling
-                    if (!availableWorkers.contains(workerId)) {
-                        return false;
-                    }
-
-                    return true;
+                    return availableWorkers.contains(workerId);
                 })
                 .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
                 .collect(Collectors.toList());
@@ -638,8 +610,8 @@ public class SchedulerManager implements AutoCloseable {
                     workerId, drainSuccessful ? "" : "un",
                     (System.nanoTime() - startTime) / Math.pow(10, 9),
                     schedulerStats.getSummary(), schedulerStats);
-            return postDrainAssignments;
         }
+        return postDrainAssignments;
     }
 
     private void compactAssignmentTopic() {
@@ -648,7 +620,7 @@ public class SchedulerManager implements AutoCloseable {
                 this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
             } catch (PulsarAdminException e) {
                 log.error("Failed to trigger compaction", e);
-                scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+                scheduledExecutorService.schedule(this::compactAssignmentTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
                         TimeUnit.SECONDS);
             }
         }
@@ -660,7 +632,7 @@ public class SchedulerManager implements AutoCloseable {
 
         if (drainOpStatusMap.size() > 0) {
             val currentMembership = membershipManager.getCurrentMembership()
-                    .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
+                    .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());
             val removeWorkerIds = new ArrayList<String>();
 
             for (String workerId : drainOpStatusMap.keySet()) {
@@ -688,7 +660,7 @@ public class SchedulerManager implements AutoCloseable {
                 this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
             } catch (PulsarAdminException e) {
                 log.error("Failed to trigger compaction", e);
-                scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+                scheduledExecutorService.schedule(this::compactFunctionMetadataTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
                         TimeUnit.SECONDS);
             }
         }
@@ -708,7 +680,7 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
-                                                                     boolean externallyManagedRuntime) {
+                                                                      boolean externallyManagedRuntime) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
             for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
@@ -719,7 +691,7 @@ public class SchedulerManager implements AutoCloseable {
     }
 
     static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
-                                                           boolean externallyManagedRuntime) {
+                                                    boolean externallyManagedRuntime) {
         List<Function.Instance> functionInstances = new LinkedList<>();
         if (!externallyManagedRuntime) {
             int instances = functionMetaData.getFunctionDetails().getParallelism();
@@ -831,9 +803,9 @@ public class SchedulerManager implements AutoCloseable {
             private boolean alive;
         }
 
-        private Map<String, WorkerStats> workerStatsMap = new HashMap<>();
+        private final Map<String, WorkerStats> workerStatsMap = new HashMap<>();
 
-        private Map<String, String> instanceToWorkerId = new HashMap<>();
+        private final Map<String, String> instanceToWorkerId = new HashMap<>();
 
         public SchedulerStats(Map<String, Map<String, Assignment>> workerIdToAssignments, Set<String> workers) {