You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/08 06:05:41 UTC

[pulsar] branch master updated: fix: NPE when there are assignments for workers not in membership (#2744)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e3203d  fix: NPE when there are assignments for workers not in membership (#2744)
7e3203d is described below

commit 7e3203d686e8dd81a019c5cb7441501ada556792
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Oct 7 23:05:36 2018 -0700

    fix: NPE when there are assignments for workers not in membership (#2744)
    
    ### Motivation
    
    Fixes #2743
    
    NPE occurs when there are assignments for workers not currently in the membership.  This could happen if the worker crashed before failure detector unassigned the functions assign to it and the scheduler is triggered by other means.
    
    ### Modifications
    
    Filter out the assignments for worker not in membership.
---
 .../pulsar/functions/worker/SchedulerManager.java  |  21 ++-
 .../functions/worker/SchedulerManagerTest.java     | 144 +++++++++++++++------
 2 files changed, 120 insertions(+), 45 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 24217cf..28f5e99 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -31,8 +31,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import lombok.Getter;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -139,7 +141,7 @@ public class SchedulerManager implements AutoCloseable {
                         invokeScheduler();
                     } catch (Exception e) {
                         log.warn("Failed to invoke scheduler", e);
-                        schedule();
+                        throw e;
                     }
                 }
             }
@@ -167,6 +169,7 @@ public class SchedulerManager implements AutoCloseable {
         Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
         Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager
                 .getCurrentAssignments();
+
         //delete assignments of functions and instances that don't exist anymore
         Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator();
         while (it.hasNext()) {
@@ -199,8 +202,20 @@ public class SchedulerManager implements AutoCloseable {
         }
 
         List<Assignment> currentAssignments = workerIdToAssignments
-                .entrySet().stream()
-                .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()).collect(Collectors.toList());
+                .entrySet()
+                .stream()
+                .filter(workerIdToAssignmentEntry -> {
+                    String workerId = workerIdToAssignmentEntry.getKey();
+                    // remove assignments to workers that don't exist / died for now.
+                    // wait for failure detector to unassign them in the future for re-scheduling
+                    if (!currentMembership.contains(workerId)) {
+                        return false;
+                    }
+
+                    return true;
+                })
+                .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream())
+                .collect(Collectors.toList());
 
         Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments,
                 allInstances);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 6dde08c..3ec3072 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -18,32 +18,10 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertTrue;
-
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -62,11 +40,31 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
-import lombok.extern.slf4j.Slf4j;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
 
 @Slf4j
 public class SchedulerManagerTest {
@@ -187,6 +185,9 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -432,7 +433,7 @@ public class SchedulerManagerTest {
         Assert.assertEquals(invocations.size(), 4);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        
+
         Set<Assignment> allAssignments = Sets.newHashSet();
         invocations.forEach(invocation -> {
             try {
@@ -499,7 +500,7 @@ public class SchedulerManagerTest {
         Assignment assignments = Assignment.parseFrom(send);
 
         log.info("assignments: {}", assignments);
-        
+
         Set<Assignment> allAssignments = Sets.newHashSet();
         invocations.forEach(invocation -> {
             try {
@@ -525,11 +526,11 @@ public class SchedulerManagerTest {
                 .setInstance(Function.Instance.newBuilder()
                         .setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        
+
         assertTrue(allAssignments.contains(assignment2_1));
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
-        
+
         // scale down
 
         Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
@@ -555,7 +556,7 @@ public class SchedulerManagerTest {
                 Object.class));
         send = (byte[]) invocations.get(0).getRawArguments()[0];
         assignments = Assignment.parseFrom(send);
-        
+
         Set<Assignment> allAssignments2 = Sets.newHashSet();
         invocations.forEach(invocation -> {
             try {
@@ -623,7 +624,7 @@ public class SchedulerManagerTest {
             }
         });
     }
-    
+
     @Test
     public void testUpdate() throws Exception {
         List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
@@ -694,14 +695,14 @@ public class SchedulerManagerTest {
                 .setInstance(Function.Instance.newBuilder()
                         .setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
-        
+
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 3);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
         send = (byte[]) invocations.get(0).getRawArguments()[0];
         assignments = Assignment.parseFrom(send);
-        
+
         Set<Assignment> allAssignments = Sets.newHashSet();
         invocations.forEach(invocation -> {
             try {
@@ -710,11 +711,11 @@ public class SchedulerManagerTest {
                 throw new RuntimeException(e);
             }
         });
-        
+
         assertTrue(allAssignments.contains(assignment2_1));
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
-        
+
         // scale down
 
         Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder()
@@ -751,7 +752,7 @@ public class SchedulerManagerTest {
                 Object.class));
         send = (byte[]) invocations.get(0).getRawArguments()[0];
         assignments = Assignment.parseFrom(send);
-        
+
         Set<Assignment> allAssignments2 = Sets.newHashSet();
         invocations.forEach(invocation -> {
             try {
@@ -760,12 +761,71 @@ public class SchedulerManagerTest {
                 throw new RuntimeException(e);
             }
         });
-        
+
         assertTrue(allAssignments2.contains(assignment2Updated1));
         assertTrue(allAssignments2.contains(assignment2Updated2));
         assertTrue(allAssignments2.contains(assignment2Updated3));
     }
 
+    @Test
+    public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException, InvalidProtocolBufferException {
+        List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        long version = 5;
+        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
+                        .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
+                .build();
+
+        Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
+                        .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
+                .build();
+        functionMetaDataList.add(function1);
+        functionMetaDataList.add(function2);
+        doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
+
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
+        // set assignments
+        Function.Assignment assignment1 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-1")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function1).setInstanceId(0).build())
+                .build();
+
+        // set assignment to worker that doesn't exist / died
+        Function.Assignment assignment2 = Function.Assignment.newBuilder()
+                .setWorkerId("worker-2")
+                .setInstance(Function.Instance.newBuilder()
+                        .setFunctionMetaData(function2).setInstanceId(0).build())
+                .build();
+
+        Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
+        Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
+        assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        currentAssignments.put("worker-1", assignmentEntry1);
+
+        Map<String, Function.Assignment> assignmentEntry2 = new HashMap<>();
+        assignmentEntry2.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+        currentAssignments.put("worker-2", assignmentEntry2);
+
+        doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
+
+        // single node
+        List<WorkerInfo> workerInfoList = new LinkedList<>();
+        workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
+
+        // i am leader
+        doReturn(true).when(membershipManager).isLeader();
+
+        callSchedule();
+
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        Assert.assertEquals(invocations.size(), 0);
+    }
+
     private void callSchedule() throws NoSuchMethodException, InterruptedException,
             TimeoutException, ExecutionException {
         Future<?> complete = schedulerManager.schedule();