You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/02 22:00:04 UTC

[GitHub] rdhabalia closed pull request #2613: Avoid scheduling heartbeat function if owner-worker not available

rdhabalia closed pull request #2613: Avoid scheduling heartbeat function if owner-worker not available
URL: https://github.com/apache/pulsar/pull/2613
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12881..40a2fd1521 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -46,6 +46,7 @@
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
 
 /**
  * A simple implementation of leader election using a pulsar topic.
@@ -239,6 +240,10 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
             if (!currentMembership.contains(workerId)) {
                 for (Function.Assignment assignmentEntry : assignmentEntries.values()) {
                     Function.Instance instance = assignmentEntry.getInstance();
+                    // avoid scheduling-trigger for heartbeat-function if owner-worker is not up
+                    if (checkHeartBeatFunction(instance) != null) {
+                        continue;
+                    }
                     if (!this.unsignedFunctionDurations.containsKey(instance)) {
                         this.unsignedFunctionDurations.put(instance, currentTimeMs);
                     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index db7785a55b..6d24294cbb 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,17 +18,23 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
+
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.CompressionType;
@@ -37,11 +43,14 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -70,6 +79,8 @@
     
     AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
     private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; 
+    public static final String HEARTBEAT_TENANT = "pulsar-function";
+    public static final String HEARTBEAT_NAMESPACE = "heartbeat";
 
     public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
         this.workerConfig = workerConfig;
@@ -122,8 +133,8 @@ private void scheduleCompaction(ScheduledExecutorService executor, long schedule
     @VisibleForTesting
     public void invokeScheduler() {
         
-        List<String> currentMembership = this.membershipManager.getCurrentMembership()
-                .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
+        Set<String> currentMembership = this.membershipManager.getCurrentMembership()
+                .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
 
         List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
         Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions);
@@ -164,12 +175,12 @@ public void invokeScheduler() {
                 .entrySet().stream()
                 .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()).collect(Collectors.toList());
 
-        List<Function.Instance> needsAssignment = this.getUnassignedFunctionInstances(workerIdToAssignments,
+        Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments,
                 allInstances);
 
-        List<Assignment> assignments = this.scheduler.schedule(
-                needsAssignment, currentAssignments, currentMembership);
-
+        List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
+        assignments.addAll(unassignedInstances.getRight());
+        
         if (log.isDebugEnabled()) {
             log.debug("New assignments computed: {}", assignments);
         }
@@ -229,10 +240,11 @@ private void publishNewAssignment(Assignment assignment, boolean deleted) {
         return functionInstances;
     }
 
-    private List<Function.Instance> getUnassignedFunctionInstances(
+    private Pair<List<Function.Instance>, List<Assignment>> getUnassignedFunctionInstances(
             Map<String, Map<String, Assignment>> currentAssignments, Map<String, Function.Instance> functionInstances) {
 
         List<Function.Instance> unassignedFunctionInstances = new LinkedList<>();
+        List<Assignment> heartBeatAssignments = Lists.newArrayList();
         Map<String, Assignment> assignmentMap = new HashMap<>();
         for (Map<String, Assignment> entry : currentAssignments.values()) {
             assignmentMap.putAll(entry);
@@ -241,11 +253,17 @@ private void publishNewAssignment(Assignment assignment, boolean deleted) {
         for (Map.Entry<String, Function.Instance> instanceEntry : functionInstances.entrySet()) {
             String fullyQualifiedInstanceId = instanceEntry.getKey();
             Function.Instance instance = instanceEntry.getValue();
+            String heartBeatWorkerId = checkHeartBeatFunction(instance);
+            if (heartBeatWorkerId != null) {
+                heartBeatAssignments
+                        .add(Assignment.newBuilder().setInstance(instance).setWorkerId(heartBeatWorkerId).build());
+                continue;
+            }
             if (!assignmentMap.containsKey(fullyQualifiedInstanceId)) {
                 unassignedFunctionInstances.add(instance);
             }
         }
-        return unassignedFunctionInstances;
+        return ImmutablePair.of(unassignedFunctionInstances, heartBeatAssignments);
     }
 
     @Override
@@ -256,4 +274,14 @@ public void close() {
             log.warn("Failed to shutdown scheduler manager assignment producer", e);
         }
     }
+    
+    public static String checkHeartBeatFunction(Instance funInstance) {
+        if (funInstance.getFunctionMetaData() != null
+                && funInstance.getFunctionMetaData().getFunctionDetails() != null) {
+            FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
+            return HEARTBEAT_TENANT.equals(funDetails.getTenant())
+                    && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null;
+        }
+        return null;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java
index 30fd123aa7..e19ca71a44 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java
@@ -22,9 +22,20 @@
 import org.apache.pulsar.functions.proto.Function.Instance;
 
 import java.util.List;
+import java.util.Set;
 
 public interface IScheduler {
 
-    List<Assignment> schedule(List<Instance> unassignedFunctionInstances,
-                              List<Assignment> currentAssignments, List<String> workers);
+    /**
+     * Scheduler schedules assignments to appropriate workers and adds into #resultAssignments
+     * 
+     * @param unassignedFunctionInstances
+     *            all unassigned instances
+     * @param currentAssignments
+     *            current assignments
+     * @param workers
+     * @return
+     */
+    List<Assignment> schedule(List<Instance> unassignedFunctionInstances, List<Assignment> currentAssignments,
+            Set<String> workers);
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 817962d4f1..9347ed9fda 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.scheduler;
 
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Instance;
 
 import com.google.common.collect.Lists;
@@ -28,18 +27,16 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Set;
 
 public class RoundRobinScheduler implements IScheduler {
 
-    public static final String HEARTBEAT_TENANT = "pulsar-function";
-    public static final String HEARTBEAT_NAMESPACE = "heartbeat";
-    
     @Override
-    public List<Assignment> schedule(List<Instance> unassignedFunctionInstances, List<Assignment>
-            currentAssignments, List<String> workers) {
+    public List<Assignment> schedule(List<Instance> unassignedFunctionInstances,
+            List<Assignment> currentAssignments, Set<String> workers) {
 
         Map<String, List<Assignment>> workerIdToAssignment = new HashMap<>();
+        List<Assignment> newAssignments = Lists.newArrayList();
 
         for (String workerId : workers) {
             workerIdToAssignment.put(workerId, new LinkedList<>());
@@ -49,10 +46,8 @@
             workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment);
         }
 
-        List<Assignment> newAssignments = Lists.newArrayList();
         for (Instance unassignedFunctionInstance : unassignedFunctionInstances) {
-            String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance);
-            String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment);
+            String workerId = findNextWorker(workerIdToAssignment);
             Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance)
                     .setWorkerId(workerId).build();
             workerIdToAssignment.get(workerId).add(newAssignment);
@@ -62,16 +57,6 @@
         return newAssignments;
     }
 
-    private static String checkHeartBeatFunction(Instance funInstance) {
-        if (funInstance.getFunctionMetaData() != null
-                && funInstance.getFunctionMetaData().getFunctionDetails() != null) {
-            FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
-            return HEARTBEAT_TENANT.equals(funDetails.getTenant())
-                    && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null;
-        }
-        return null;
-    }
-
     private String findNextWorker(Map<String, List<Assignment>> workerIdToAssignment) {
         String targetWorkerId = null;
         int least = Integer.MAX_VALUE;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index c849fd36de..a2fb6207a3 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -48,6 +48,7 @@
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -359,4 +360,78 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
         verify(schedulerManager, times(1)).schedule();
         verify(functionRuntimeManager, times(0)).removeAssignments(any());
     }
+
+    @Test
+    public void testHeartBeatFunctionWorkerDown() throws Exception {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setWorkerId("worker-1");
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+        workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+        workerConfig.setStateStorageServiceUrl("foo");
+        workerConfig.setRescheduleTimeoutMs(30000);
+        SchedulerManager schedulerManager = mock(SchedulerManager.class);
+        PulsarClient pulsarClient = mockPulsarClient();
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
+        doReturn(readerBuilder).when(pulsarClient).newReader();
+        doReturn(readerBuilder).when(readerBuilder).topic(anyString());
+        doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
+        doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
+        doReturn(mock(Reader.class)).when(readerBuilder).create();
+        WorkerService workerService = mock(WorkerService.class);
+        doReturn(pulsarClient).when(workerService).getClient();
+        doReturn(workerConfig).when(workerService).getWorkerConfig();
+        doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
+        
+        FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
+                workerConfig,
+                workerService,
+                mock(Namespace.class),
+                mock(MembershipManager.class),
+                mock(ConnectorsManager.class)
+        ));
+        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+        MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));
+
+        List<WorkerInfo> workerInfoList = new LinkedList<>();
+        workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
+        // make worker-2 unavailable
+        //workerInfoList.add(WorkerInfo.of("worker-2", "host-2", 8001));
+
+        Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
+
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setParallelism(1)
+                        .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setParallelism(1)
+                        .setTenant(SchedulerManager.HEARTBEAT_TENANT)
+                        .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE).setName("worker-2"))
+                .build();
+
+        List<Function.FunctionMetaData> metaDataList = new LinkedList<>();
+        metaDataList.add(function1);
+        metaDataList.add(function2);
+
+        Mockito.doReturn(metaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1").setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-2").setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        // add existing assignments
+        functionRuntimeManager.setAssignment(assignment1);
+        functionRuntimeManager.setAssignment(assignment2);
+
+        membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager);
+
+        verify(schedulerManager, times(0)).schedule();
+        verify(functionRuntimeManager, times(0)).removeAssignments(any());
+        Assert.assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
+    }
+    
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 00e4b6e0e1..53669e449c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -573,14 +573,14 @@ public void testHeartbeatFunction() throws Exception {
         final String workerId2 = "host-workerId-2";
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1)
-                        .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
-                        .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+                        .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE)
+                        .setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1))
                 .setVersion(version).build();
 
         Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2)
-                        .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
-                        .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
+                        .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE)
+                        .setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1))
                 .setVersion(version).build();
         functionMetaDataList.add(function1);
         functionMetaDataList.add(function2);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services