You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 03:08:14 UTC
[pulsar] branch master updated: [cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 62e2a9abd02 [cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)
62e2a9abd02 is described below
commit 62e2a9abd02e99a81e6d4811046f4799f027ce7a
Author: Elliot West <el...@streamnative.io>
AuthorDate: Thu Aug 11 04:08:08 2022 +0100
[cleanup][pulsar-functions] Remove compiler warnings for return from finally (#16451)
### Motivation
To reduce warnings generated during compilation by cleaning up the code.
```
[WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[334,9] finally clause cannot complete normally
[WARNING] /Users/ewest/git/pulsar/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java:[642,9] finally clause cannot complete normally
```
### Modifications
* Moved `return` from `finally` blocks
* Simplified code through the use of simpler, equivalent language constructs
* Made some constant fields `final`
Co-authored-by: Matteo Merli <mm...@apache.org>
---
.../pulsar/functions/worker/SchedulerManager.java | 114 ++++++++-------------
1 file changed, 43 insertions(+), 71 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 79712398bef..af633ca2e9b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
@@ -31,6 +32,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,6 +63,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
@@ -71,7 +74,6 @@ import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-@Slf4j
/**
* The scheduler manager is used to compute scheduling of function instances
* Only the leader computes new schedulings and writes assignments to the assignment topic
@@ -80,6 +82,7 @@ import org.apache.pulsar.functions.worker.scheduler.IScheduler;
* 2. When worker loses leadership, this class will be closed which
* also closes the worker's producer to the assignments topic
*/
+@Slf4j
public class SchedulerManager implements AutoCloseable {
private final WorkerConfig workerConfig;
@@ -109,7 +112,7 @@ public class SchedulerManager implements AutoCloseable {
private final PulsarAdmin admin;
@Getter
- private Lock schedulerLock = new ReentrantLock(true);
+ private final Lock schedulerLock = new ReentrantLock(true);
private volatile boolean isRunning = false;
@@ -123,11 +126,9 @@ public class SchedulerManager implements AutoCloseable {
private MessageId metadataTopicLastMessage = MessageId.earliest;
- private AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
- private Future<?> currentRebalanceFuture;
+ private final AtomicBoolean rebalanceInProgress = new AtomicBoolean(false);
- private AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
- private Future<?> currentDrainFuture;
+ private final AtomicBoolean drainInProgressFlag = new AtomicBoolean(false);
// The list of assignments moved due to the last drain op on a leader. Used in UTs, and debugging.
private List<Assignment> assignmentsMovedInLastDrain;
@@ -136,13 +137,13 @@ public class SchedulerManager implements AutoCloseable {
DrainNotInProgress,
DrainInProgress,
DrainCompleted
- };
+ }
// A map to hold the status of recent drain operations.
// It is of the form {workerId : DrainOpStatus}.
// Entries are added when a drain operation starts, and removed on a periodic (when the worker is no longer seen
// on a poll).
- private ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<String, DrainOpStatus>();
+ private final ConcurrentHashMap<String, DrainOpStatus> drainOpStatusMap = new ConcurrentHashMap<>();
public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
@@ -248,8 +249,7 @@ public class SchedulerManager implements AutoCloseable {
rebalanceInProgress.set(false);
throw new TooFewWorkersException();
}
- currentRebalanceFuture = rebalance();
- return currentRebalanceFuture;
+ return rebalance();
} else {
throw new RebalanceInProgressException();
}
@@ -289,8 +289,7 @@ public class SchedulerManager implements AutoCloseable {
throw new UnknownWorkerException();
}
- currentDrainFuture = drain(workerId);
- return currentDrainFuture;
+ return drain(workerId);
} finally {
drainInProgressFlag.set(false);
}
@@ -301,37 +300,25 @@ public class SchedulerManager implements AutoCloseable {
public LongRunningProcessStatus getDrainStatus(String workerId) {
long startTime = System.nanoTime();
- String errString;
- LongRunningProcessStatus retVal = new LongRunningProcessStatus();
- try {
- val workerStatus = drainOpStatusMap.get(workerId);
- if (workerStatus == null) {
- errString = "Worker " + workerId + " not found in drain records";
- retVal = LongRunningProcessStatus.forError(errString);
- } else {
- switch (workerStatus) {
- default:
- errString =
- "getDrainStatus: Unexpected status " + workerStatus + " found for worker " + workerId;
- retVal = LongRunningProcessStatus.forError(errString);
- break;
- case DrainCompleted:
- retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
- break;
- case DrainInProgress:
- retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
- break;
- case DrainNotInProgress:
- retVal = LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
- break;
- }
- }
- } finally {
- log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
- workerId, (System.nanoTime() - startTime) / Math.pow(10, 9),
- retVal.status, retVal.lastError);
- return retVal;
- }
+ LongRunningProcessStatus status = Optional.ofNullable(workerId).map(id ->
+ Optional.ofNullable(drainOpStatusMap.get(id)).map(opStatus ->
+ switch (opStatus) {
+ case DrainCompleted ->
+ LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+ case DrainInProgress ->
+ LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+ case DrainNotInProgress ->
+ LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+ }).orElse(
+ LongRunningProcessStatus.forError("Worker " + id + " not found in drain records")
+ )
+ ).orElse(
+ new LongRunningProcessStatus()
+ );
+ log.info("Get drain status for worker {} - execution time: {} sec; returning status={}, error={}",
+ workerId, NANOSECONDS.toSeconds (System.nanoTime() - startTime),
+ status.status, status.lastError);
+ return status;
}
// The following method is used only for testing.
@@ -351,8 +338,7 @@ public class SchedulerManager implements AutoCloseable {
// The following method is used only for testing.
@VisibleForTesting
ConcurrentHashMap<String, DrainOpStatus> getDrainOpsStatusMap() {
- val retVal = new ConcurrentHashMap<String, DrainOpStatus>(drainOpStatusMap);
- return retVal;
+ return new ConcurrentHashMap<>(drainOpStatusMap);
}
private synchronized int getCurrentAvailableNumWorkers() {
@@ -361,16 +347,10 @@ public class SchedulerManager implements AutoCloseable {
private synchronized Set<String> getCurrentAvailableWorkers() {
Set<String> currentMembership = membershipManager.getCurrentMembership()
- .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
+ .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());
// iterate the set, instead of the concurrent hashmap
- Iterator<String> iter = currentMembership.iterator();
- while (iter.hasNext()) {
- if (drainOpStatusMap.containsKey(iter.next())) {
- iter.remove();
- }
- }
-
+ currentMembership.removeIf(drainOpStatusMap::containsKey);
return currentMembership;
}
@@ -445,11 +425,7 @@ public class SchedulerManager implements AutoCloseable {
String workerId = workerIdToAssignmentEntry.getKey();
// remove assignments to workers that don't exist / died for now.
// wait for failure detector to unassign them in the future for re-scheduling
- if (!availableWorkers.contains(workerId)) {
- return false;
- }
-
- return true;
+ return availableWorkers.contains(workerId);
})
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
@@ -505,11 +481,7 @@ public class SchedulerManager implements AutoCloseable {
String workerId = workerIdToAssignmentEntry.getKey();
// remove assignments to workers that don't exist / died for now.
// wait for failure detector to unassign them in the future for re-scheduling
- if (!availableWorkers.contains(workerId)) {
- return false;
- }
-
- return true;
+ return availableWorkers.contains(workerId);
})
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
.collect(Collectors.toList());
@@ -638,8 +610,8 @@ public class SchedulerManager implements AutoCloseable {
workerId, drainSuccessful ? "" : "un",
(System.nanoTime() - startTime) / Math.pow(10, 9),
schedulerStats.getSummary(), schedulerStats);
- return postDrainAssignments;
}
+ return postDrainAssignments;
}
private void compactAssignmentTopic() {
@@ -648,7 +620,7 @@ public class SchedulerManager implements AutoCloseable {
this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
- scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+ scheduledExecutorService.schedule(this::compactAssignmentTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
@@ -660,7 +632,7 @@ public class SchedulerManager implements AutoCloseable {
if (drainOpStatusMap.size() > 0) {
val currentMembership = membershipManager.getCurrentMembership()
- .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
+ .stream().map(WorkerInfo::getWorkerId).collect(Collectors.toSet());
val removeWorkerIds = new ArrayList<String>();
for (String workerId : drainOpStatusMap.keySet()) {
@@ -688,7 +660,7 @@ public class SchedulerManager implements AutoCloseable {
this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
- scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+ scheduledExecutorService.schedule(this::compactFunctionMetadataTopic, DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
@@ -708,7 +680,7 @@ public class SchedulerManager implements AutoCloseable {
}
private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
- boolean externallyManagedRuntime) {
+ boolean externallyManagedRuntime) {
Map<String, Function.Instance> functionInstances = new HashMap<>();
for (FunctionMetaData functionMetaData : allFunctions) {
for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
@@ -719,7 +691,7 @@ public class SchedulerManager implements AutoCloseable {
}
static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
- boolean externallyManagedRuntime) {
+ boolean externallyManagedRuntime) {
List<Function.Instance> functionInstances = new LinkedList<>();
if (!externallyManagedRuntime) {
int instances = functionMetaData.getFunctionDetails().getParallelism();
@@ -831,9 +803,9 @@ public class SchedulerManager implements AutoCloseable {
private boolean alive;
}
- private Map<String, WorkerStats> workerStatsMap = new HashMap<>();
+ private final Map<String, WorkerStats> workerStatsMap = new HashMap<>();
- private Map<String, String> instanceToWorkerId = new HashMap<>();
+ private final Map<String, String> instanceToWorkerId = new HashMap<>();
public SchedulerStats(Map<String, Map<String, Assignment>> workerIdToAssignments, Set<String> workers) {