You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 03:51:58 UTC
[pulsar] branch master updated: fix: function update and related
tests are broken (#2755)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d6155e fix: function update and related tests are broken (#2755)
6d6155e is described below
commit 6d6155e0f73aaf8d5959bddb73ad1c0f70cc1e26
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Oct 9 22:51:53 2018 -0500
fix: function update and related tests are broken (#2755)
* fix: function update is broken since assignments of updated functions are not propagated
* remove test code
---
.../pulsar/functions/worker/SchedulerManager.java | 5 +--
.../functions/worker/SchedulerManagerTest.java | 37 +++++++++++-----------
2 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 28f5e99..c496766 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -194,6 +194,7 @@ public class SchedulerManager implements AutoCloseable {
if (!assignment.getInstance().equals(instance)) {
functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
+ publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
}
}
if (functionMap.isEmpty()) {
@@ -222,7 +223,7 @@ public class SchedulerManager implements AutoCloseable {
List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
assignments.addAll(unassignedInstances.getRight());
-
+
if (log.isDebugEnabled()) {
log.debug("New assignments computed: {}", assignments);
}
@@ -251,7 +252,7 @@ public class SchedulerManager implements AutoCloseable {
try {
String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
// publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
- // message
+ // message
producer.newMessage().key(fullyQualifiedInstanceId)
.value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
} catch (Exception e) {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 3ec3072..0b8d177 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -64,6 +64,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Slf4j
@@ -400,6 +401,9 @@ public class SchedulerManagerTest {
.build();
Assert.assertEquals(assignments, assignment2);
+ // updating assignments
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+
// scale up
Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
@@ -510,7 +514,6 @@ public class SchedulerManagerTest {
}
});
-
Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
@@ -531,6 +534,11 @@ public class SchedulerManagerTest {
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));
+ // updating assignments
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+
// scale down
Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
@@ -551,11 +559,9 @@ public class SchedulerManagerTest {
callSchedule();
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
- Assert.assertEquals(invocations.size(), 4);
+ Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
- send = (byte[]) invocations.get(0).getRawArguments()[0];
- assignments = Assignment.parseFrom(send);
Set<Assignment> allAssignments2 = Sets.newHashSet();
invocations.forEach(invocation -> {
@@ -671,15 +677,6 @@ public class SchedulerManagerTest {
callSchedule();
- List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
- Assert.assertEquals(invocations.size(), 3);
- invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
- Object.class));
- byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
- Assignment assignments = Assignment.parseFrom(send);
-
- log.info("assignmentsUpdate: {}", assignments);
-
Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
@@ -696,12 +693,10 @@ public class SchedulerManagerTest {
.setFunctionMetaData(function2).setInstanceId(2).build())
.build();
- invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
+ List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
Assert.assertEquals(invocations.size(), 3);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
- send = (byte[]) invocations.get(0).getRawArguments()[0];
- assignments = Assignment.parseFrom(send);
Set<Assignment> allAssignments = Sets.newHashSet();
invocations.forEach(invocation -> {
@@ -712,11 +707,17 @@ public class SchedulerManagerTest {
}
});
+ assertEquals(allAssignments.size(), 3);
assertTrue(allAssignments.contains(assignment2_1));
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));
- // scale down
+ // updating assignments
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+ currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+
+ // update field
Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
@@ -750,8 +751,6 @@ public class SchedulerManagerTest {
Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
- send = (byte[]) invocations.get(0).getRawArguments()[0];
- assignments = Assignment.parseFrom(send);
Set<Assignment> allAssignments2 = Sets.newHashSet();
invocations.forEach(invocation -> {