You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 03:51:58 UTC

[pulsar] branch master updated: fix: function update and related tests are broken (#2755)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d6155e  fix: function update and related tests are broken (#2755)
6d6155e is described below

commit 6d6155e0f73aaf8d5959bddb73ad1c0f70cc1e26
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Oct 9 22:51:53 2018 -0500

    fix: function update and related tests are broken (#2755)
    
    * fix: function update is broken since assignments of updated functions are not propagated
    
    * remove test code
---
 .../pulsar/functions/worker/SchedulerManager.java  |  5 +--
 .../functions/worker/SchedulerManagerTest.java     | 37 +++++++++++-----------
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 28f5e99..c496766 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -194,6 +194,7 @@ public class SchedulerManager implements AutoCloseable {
 
                 if (!assignment.getInstance().equals(instance)) {
                     functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
+                    publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
                 }
             }
             if (functionMap.isEmpty()) {
@@ -222,7 +223,7 @@ public class SchedulerManager implements AutoCloseable {
 
         List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
         assignments.addAll(unassignedInstances.getRight());
-        
+
         if (log.isDebugEnabled()) {
             log.debug("New assignments computed: {}", assignments);
         }
@@ -251,7 +252,7 @@ public class SchedulerManager implements AutoCloseable {
         try {
             String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
             // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
-            // message 
+            // message
             producer.newMessage().key(fullyQualifiedInstanceId)
                     .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
         } catch (Exception e) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 3ec3072..0b8d177 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -64,6 +64,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 @Slf4j
@@ -400,6 +401,9 @@ public class SchedulerManagerTest {
                 .build();
         Assert.assertEquals(assignments, assignment2);
 
+        // updating assignments
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+
         // scale up
 
         Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
@@ -510,7 +514,6 @@ public class SchedulerManagerTest {
             }
         });
 
-
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -531,6 +534,11 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
 
+        // updating assignments
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+
         // scale down
 
         Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
@@ -551,11 +559,9 @@ public class SchedulerManagerTest {
         callSchedule();
 
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
-        Assert.assertEquals(invocations.size(), 4);
+        Assert.assertEquals(invocations.size(), 6);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        send = (byte[]) invocations.get(0).getRawArguments()[0];
-        assignments = Assignment.parseFrom(send);
 
         Set<Assignment> allAssignments2 = Sets.newHashSet();
         invocations.forEach(invocation -> {
@@ -671,15 +677,6 @@ public class SchedulerManagerTest {
 
         callSchedule();
 
-        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
-        Assert.assertEquals(invocations.size(), 3);
-        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
-                Object.class));
-        byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
-        Assignment assignments = Assignment.parseFrom(send);
-
-        log.info("assignmentsUpdate: {}", assignments);
-
         Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
                 .setInstance(Function.Instance.newBuilder()
@@ -696,12 +693,10 @@ public class SchedulerManagerTest {
                         .setFunctionMetaData(function2).setInstanceId(2).build())
                 .build();
 
-        invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+        List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
         Assert.assertEquals(invocations.size(), 3);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        send = (byte[]) invocations.get(0).getRawArguments()[0];
-        assignments = Assignment.parseFrom(send);
 
         Set<Assignment> allAssignments = Sets.newHashSet();
         invocations.forEach(invocation -> {
@@ -712,11 +707,17 @@ public class SchedulerManagerTest {
             }
         });
 
+        assertEquals(allAssignments.size(), 3);
         assertTrue(allAssignments.contains(assignment2_1));
         assertTrue(allAssignments.contains(assignment2_2));
         assertTrue(allAssignments.contains(assignment2_3));
 
-        // scale down
+        // updating assignments
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+        currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+
+        // update field
 
         Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder()
                 .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
@@ -750,8 +751,6 @@ public class SchedulerManagerTest {
         Assert.assertEquals(invocations.size(), 6);
         invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
                 Object.class));
-        send = (byte[]) invocations.get(0).getRawArguments()[0];
-        assignments = Assignment.parseFrom(send);
 
         Set<Assignment> allAssignments2 = Sets.newHashSet();
         invocations.forEach(invocation -> {