You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/01/14 17:54:29 UTC
[pulsar] branch master updated: Cleanup consumer subscriptions and
fix graceful shutdown for functions/sinks (#3299)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 034f6ba Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299)
034f6ba is described below
commit 034f6ba04e9c48abec1517668cb4fa46efdf02bc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Jan 14 09:54:23 2019 -0800
Cleanup consumer subscriptions and fix graceful shutdown for functions/sinks (#3299)
* Cleanup consumer subscriptions and fix graceful shutdown for functions
* cleaning up
* removing testing files
* add unit tests
* adding integration testing
* refactoring
* refactoring and adding tests
* cleaning up
---
.../worker/PulsarWorkerAssignmentTest.java | 2 +
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 168 ++++++++++++++++++++-
.../pulsar/common/functions/FunctionConfig.java | 2 +
.../org/apache/pulsar/common/io/SinkConfig.java | 3 +-
.../pulsar/functions/instance/InstanceUtils.java | 12 ++
.../functions/instance/JavaInstanceRunnable.java | 4 +-
.../functions/source/PulsarSourceConfig.java | 1 +
.../instance/src/main/python/Function_pb2.py | 47 +++---
.../instance/src/main/python/function_stats.py | 2 +-
.../instance/src/main/python/python_instance.py | 22 ++-
.../src/main/python/python_instance_main.py | 4 +-
pulsar-functions/instance/src/main/python/util.py | 4 +-
.../proto/src/main/proto/Function.proto | 1 +
.../pulsar/functions/runtime/ProcessRuntime.java | 29 +++-
.../pulsar/functions/runtime/ThreadRuntime.java | 2 +
.../functions/utils/FunctionConfigUtils.java | 5 +
.../pulsar/functions/utils/SinkConfigUtils.java | 6 +
.../pulsar/functions/worker/FunctionAction.java | 3 +-
.../pulsar/functions/worker/FunctionActioner.java | 85 +++++++++--
.../functions/worker/FunctionRuntimeManager.java | 34 ++++-
.../pulsar/functions/worker/WorkerService.java | 8 +-
.../functions/worker/FunctionActionerTest.java | 5 +-
.../worker/FunctionRuntimeManagerTest.java | 25 +--
.../functions/worker/MembershipManagerTest.java | 32 ++--
.../integration/functions/PulsarFunctionsTest.java | 20 +++
25 files changed, 439 insertions(+), 87 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 5cdf5a4..4b4597e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import com.google.gson.Gson;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -213,6 +214,7 @@ public class PulsarWorkerAssignmentTest {
}
}, 5, 150);
// validate pulsar sink consumer has started on the topic
+ log.info("admin.topics().getStats(sinkTopic): {}", new Gson().toJson(admin.topics().getStats(sinkTopic)));
assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 1);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 94e7bfa..f04fede 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -20,11 +20,13 @@ package org.apache.pulsar.io;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.gson.Gson;
import lombok.ToString;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
@@ -42,11 +44,13 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -119,6 +123,7 @@ public class PulsarFunctionE2ETest {
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
@@ -148,14 +153,24 @@ public class PulsarFunctionE2ETest {
config.setBrokerServicePort(brokerServicePort);
config.setBrokerServicePortTls(brokerServiceTlsPort);
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setTlsAllowInsecureConnection(true);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
+
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- config.setTlsAllowInsecureConnection(true);
+ config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ config.setBrokerClientTlsEnabled(true);
+
+
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
@@ -170,7 +185,7 @@ public class PulsarFunctionE2ETest {
authTls.configure(authParams);
admin = spy(
- PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.allowTlsInsecureConnection(true).authentication(authTls).build());
brokerStatsClient = admin.brokerStats();
@@ -211,6 +226,7 @@ public class PulsarFunctionE2ETest {
}
private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
workerConfig = new WorkerConfig();
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
@@ -237,7 +253,7 @@ public class PulsarFunctionE2ETest {
String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
workerConfig.setUseTls(true);
workerConfig.setTlsAllowInsecureConnection(true);
- workerConfig.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
+ workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);
@@ -260,6 +276,7 @@ public class PulsarFunctionE2ETest {
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
return functionConfig;
}
@@ -283,6 +300,7 @@ public class PulsarFunctionE2ETest {
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sinkConfig.setInputs(Collections.singleton(sourceTopic));
sinkConfig.setSourceSubscriptionName(subName);
+ sinkConfig.setCleanupSubscription(true);
return sinkConfig;
}
/**
@@ -350,6 +368,20 @@ public class PulsarFunctionE2ETest {
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
}
@Test(timeOut = 20000)
@@ -535,6 +567,21 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertTrue(m.value > 0.0);
+
+
+ // delete functions
+ admin.sink().deleteSink(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
}
@Test(timeOut = 20000)
@@ -945,6 +992,20 @@ public class PulsarFunctionE2ETest {
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.value, (double) totalMsgs);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
}
@Test(timeOut = 20000)
@@ -1012,6 +1073,20 @@ public class PulsarFunctionE2ETest {
assertEquals((int)count, totalMsgs);
assertEquals((int) success, totalMsgs);
assertEquals(ownerWorkerId, workerId);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
}
@Test(dataProvider = "validRoleName")
@@ -1110,6 +1185,93 @@ public class PulsarFunctionE2ETest {
producer.close();
}
+ @Test(timeOut = 20000)
+ public void testFunctionAutomaticSubCleanup() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespacePortion);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+ functionConfig.setInputs(Collections.singleton(sourceTopic));
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar source consumer has started on the topic
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(
+ InstanceUtils.getDefaultSubscriptionName(tenant, namespacePortion, functionName));
+ return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 200);
+
+ FunctionStatus functionStatus = admin.functions().getFunctionStatus(tenant, namespacePortion,
+ functionName);
+
+ int numInstances = functionStatus.getNumInstances();
+ assertEquals(numInstances, 1);
+
+ FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData status
+ = functionStatus.getInstances().get(0).getStatus();
+
+ double count = status.getNumReceived();
+ double success = status.getNumSuccessfullyProcessed();
+ String ownerWorkerId = status.getWorkerId();
+ assertEquals((int)count, totalMsgs);
+ assertEquals((int) success, totalMsgs);
+ assertEquals(ownerWorkerId, workerId);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+ }
+
+
public static String getPrometheusMetrics(int metricsPort) throws IOException {
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s/metrics", InetAddress.getLocalHost().getHostAddress(), metricsPort));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index bf27155..68a9a7d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -93,4 +93,6 @@ public class FunctionConfig {
private Long timeoutMs;
private String jar;
private String py;
+ // Whether the subscriptions the functions created/used should be deleted when the functions is deleted
+ private Boolean cleanupSubscription;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index b85de06..abb2067 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -66,6 +66,7 @@ public class SinkConfig {
private Resources resources;
private Boolean autoAck;
private Long timeoutMs;
-
private String archive;
+ // Whether the subscriptions the functions created/used should be deleted when the functions is deleted
+ private Boolean cleanupSubscription;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 88a9df3..9db47cf 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
@@ -107,6 +108,17 @@ public class InstanceUtils {
return SINK;
}
+ public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
+ return FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+ }
+
+ public static String getDefaultSubscriptionName(Function.FunctionDetails functionDetails) {
+ return getDefaultSubscriptionName(
+ functionDetails.getTenant(),
+ functionDetails.getNamespace(),
+ functionDetails.getName());
+ }
+
public static Map<String, String> getProperties(Utils.ComponentType componentType,
String fullyQualifiedName, int instanceId) {
Map<String, String> properties = new HashMap<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fde9628..668b8bd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -299,7 +299,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private void loadJars() throws Exception {
try {
- log.info("jarFile: {}", jarFile);
+ log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
@@ -634,7 +634,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSourceConfig.setSubscriptionName(
StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName()
- : FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+ : InstanceUtils.getDefaultSubscriptionName(instanceConfig.getFunctionDetails()));
pulsarSourceConfig.setProcessingGuarantees(
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index cf843be..ba7de68 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,7 @@ public class PulsarSourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
SubscriptionType subscriptionType;
private String subscriptionName;
+ // Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Integer maxMessageRetries = -1;
private String deadLetterTopic;
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 3979d49..f5141d4 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='Function.proto',
package='proto',
syntax='proto3',
- serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+ serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1744,
- serialized_end=1823,
+ serialized_start=1773,
+ serialized_end=1852,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1825,
- serialized_end=1869,
+ serialized_start=1854,
+ serialized_end=1898,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -420,8 +420,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1073,
- serialized_end=1134,
+ serialized_start=1102,
+ serialized_end=1163,
)
_SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -457,8 +457,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1136,
- serialized_end=1206,
+ serialized_start=1165,
+ serialized_end=1235,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -538,6 +538,13 @@ _SOURCESPEC = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10,
+ number=11, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -551,7 +558,7 @@ _SOURCESPEC = _descriptor.Descriptor(
oneofs=[
],
serialized_start=722,
- serialized_end=1206,
+ serialized_end=1235,
)
@@ -623,8 +630,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1209,
- serialized_end=1354,
+ serialized_start=1238,
+ serialized_end=1383,
)
@@ -661,8 +668,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1356,
- serialized_end=1428,
+ serialized_start=1385,
+ serialized_end=1457,
)
@@ -713,8 +720,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1431,
- serialized_end=1592,
+ serialized_start=1460,
+ serialized_end=1621,
)
@@ -751,8 +758,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1594,
- serialized_end=1675,
+ serialized_start=1623,
+ serialized_end=1704,
)
@@ -789,8 +796,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1677,
- serialized_end=1742,
+ serialized_start=1706,
+ serialized_end=1771,
)
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index d3dc322..34793a5 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -103,7 +103,7 @@ class Stats(object):
self._stat_total_received_1min = self.stat_total_received_1min.labels(*self.metrics_labels)
# start time for windowed metrics
- util.FixedTimer(60, self.reset).start()
+ util.FixedTimer(60, self.reset, name="windowed-metrics-timer").start()
def get_total_received(self):
return self._stat_total_received._value.get();
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index cfd9eae..d86173b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -42,7 +42,6 @@ import InstanceCommunication_pb2
from functools import partial
from collections import namedtuple
-from threading import Timer
from function_stats import Stats
Log = log.Log
@@ -111,8 +110,6 @@ class PythonInstance(object):
os.kill(os.getpid(), signal.SIGKILL)
sys.exit(1)
- Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
-
def run(self):
# Setup consumers and input deserializers
mode = pulsar._pulsar.ConsumerType.Shared
@@ -187,7 +184,8 @@ class PythonInstance(object):
# start proccess spawner health check timer
self.last_health_check_ts = time.time()
if self.expected_healthcheck_interval > 0:
- Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
+ timer = util.FixedTimer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer, name="health-check-timer")
+ timer.start()
def actual_execution(self):
Log.debug("Started Thread for executing the function")
@@ -384,3 +382,19 @@ class PythonInstance(object):
def join(self):
self.queue.put(InternalQuitMessage(True), True)
self.execution_thread.join()
+ self.close()
+
+ def close(self):
+ Log.info("Closing python instance...")
+ if self.producer:
+ self.producer.close()
+
+ if self.consumers:
+ for consumer in self.consumers.values():
+ try:
+ consumer.close()
+ except:
+ pass
+
+ if self.pulsar_client:
+ self.pulsar_client.close()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index b7b1bfc..c93d6e1 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -31,6 +31,7 @@ import time
import zipfile
import json
import inspect
+import threading
import pulsar
@@ -193,7 +194,8 @@ def main():
time.sleep(1)
pyinstance.join()
- sys.exit(1)
+ # make sure to close all non-daemon threads before this!
+ sys.exit(0)
if __name__ == '__main__':
main()
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 0978f39..390aed1 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -76,10 +76,12 @@ def get_properties(fullyQualifiedName, instanceId):
class FixedTimer():
- def __init__(self, t, hFunction):
+ def __init__(self, t, hFunction, name="timer-thread"):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
+ self.thread.setName(name)
+ self.thread.setDaemon(True)
def handle_function(self):
self.hFunction()
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 8d93764..cb5021b 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -98,6 +98,7 @@ message SourceSpec {
* already present in the server */
string builtin = 8;
string subscriptionName = 9;
+ bool cleanupSubscription = 11;
}
message SinkSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index d3046ba..14e68cc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -179,18 +179,39 @@ class ProcessRuntime implements Runtime {
}
@Override
- public void stop() {
+ public void stop() throws InterruptedException {
if (timer != null) {
timer.cancel(false);
}
- if (process != null) {
- process.destroyForcibly();
- }
if (channel != null) {
channel.shutdown();
}
channel = null;
stub = null;
+
+ // kill process
+ if (process != null) {
+ process.destroy();
+ int i = 0;
+ // gracefully terminate at first
+ while(process.isAlive()) {
+ Thread.sleep(100);
+ if (i > 100) {
+ break;
+ }
+ i++;
+ }
+
+ // forcibly kill after timeout
+ if (process.isAlive()) {
+ log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...",
+ Utils.getFullyQualifiedInstanceId(
+ instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId()));
+ process.destroyForcibly();
+ }
+ }
}
@Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index be049c3..2bd4644 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -124,6 +124,8 @@ class ThreadRuntime implements Runtime {
} catch (InterruptedException e) {
// ignore this
}
+ // make sure JavaInstanceRunnable is closed
+ this.javaInstanceRunnable.close();
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 2459cfd..04300db 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -118,6 +118,11 @@ public class FunctionConfigUtils {
if (functionConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
}
+ if (functionConfig.getCleanupSubscription() != null) {
+ sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
+ } else {
+ sourceSpecBuilder.setCleanupSubscription(true);
+ }
functionDetailsBuilder.setSource(sourceSpecBuilder);
// Setup sink
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 413070a..a972947 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -151,6 +151,12 @@ public class SinkConfigUtils {
sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
}
+ if (sinkConfig.getCleanupSubscription() != null) {
+ sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
+ } else {
+ sourceSpecBuilder.setCleanupSubscription(true);
+ }
+
functionDetailsBuilder.setSource(sourceSpecBuilder);
// set up sink spec
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
index 23a8154..ded8268 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
@@ -31,7 +31,8 @@ public class FunctionAction {
public enum Action {
START,
- STOP
+ STOP,
+ TERMINATE
}
private Action action;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 46343a5..2ab4828 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -37,10 +38,13 @@ import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import com.google.gson.Gson;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -50,8 +54,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder;
@@ -78,35 +86,43 @@ public class FunctionActioner implements AutoCloseable {
private volatile boolean running;
private Thread actioner;
private final ConnectorsManager connectorsManager;
+ private final PulsarAdmin pulsarAdmin;
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
LinkedBlockingQueue<FunctionAction> actionQueue,
- ConnectorsManager connectorsManager) {
+ ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.actionQueue = actionQueue;
this.connectorsManager = connectorsManager;
+ this.pulsarAdmin = pulsarAdmin;
actioner = new Thread(() -> {
log.info("Starting Actioner Thread...");
while(running) {
try {
FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS);
if (action == null) continue;
- if (action.getAction() == FunctionAction.Action.START) {
- try {
- startFunction(action.getFunctionRuntimeInfo());
- } catch (Exception ex) {
- FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance()
- .getFunctionMetaData().getFunctionDetails();
- log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
- details.getName(), ex);
- action.getFunctionRuntimeInfo().setStartupException(ex);
- }
- } else {
- stopFunction(action.getFunctionRuntimeInfo());
+ switch (action.getAction()) {
+ case START:
+ try {
+ startFunction(action.getFunctionRuntimeInfo());
+ } catch (Exception ex) {
+ FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance()
+ .getFunctionMetaData().getFunctionDetails();
+ log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
+ details.getName(), ex);
+ action.getFunctionRuntimeInfo().setStartupException(ex);
+ }
+ break;
+ case STOP:
+ stopFunction(action.getFunctionRuntimeInfo());
+ break;
+ case TERMINATE:
+ terminateFunction(action.getFunctionRuntimeInfo());
+ break;
}
} catch (InterruptedException ex) {
}
@@ -266,6 +282,49 @@ public class FunctionActioner implements AutoCloseable {
}
}
+ private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+ FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails();
+ log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(),
+ functionRuntimeInfo.getFunctionInstance().getInstanceId());
+
+ stopFunction(functionRuntimeInfo);
+ //cleanup subscriptions
+ if (details.getSource().getCleanupSubscription()) {
+ Map<String, Function.ConsumerSpec> consumerSpecMap = details.getSource().getInputSpecsMap();
+ consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, Function.ConsumerSpec>>() {
+ @Override
+ public void accept(Map.Entry<String, Function.ConsumerSpec> stringConsumerSpecEntry) {
+
+ Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue();
+ String topic = stringConsumerSpecEntry.getKey();
+ String subscriptionName = functionRuntimeInfo
+ .getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getSource().getSubscriptionName();
+ // if user specified subscription name is empty use default subscription name
+ if (isBlank(subscriptionName)) {
+ subscriptionName = InstanceUtils.getDefaultSubscriptionName(
+ functionRuntimeInfo.getFunctionInstance()
+ .getFunctionMetaData().getFunctionDetails());
+ }
+
+ try {
+ if (consumerSpec.getIsRegexPattern()) {
+ pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get(topic).getNamespace(), subscriptionName);
+ } else {
+ pulsarAdmin.topics().deleteSubscription(topic, subscriptionName);
+ }
+ } catch (PulsarAdminException e) {
+ log.warn("Failed to cleanup {} subscription for {}", subscriptionName,
+ FunctionDetailsUtils.getFullyQualifiedName(
+ functionRuntimeInfo.getFunctionInstance()
+ .getFunctionMetaData().getFunctionDetails()), e);
+ }
+ }
+ });
+ }
+ }
+
private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) {
return StringUtils.join(
new String[]{
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 152c1db..fdd62d7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
@@ -102,8 +103,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Getter
boolean isInitializePhase = false;
+ private final FunctionMetaDataManager functionMetaDataManager;
+
public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
- MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception {
+ MembershipManager membershipManager, ConnectorsManager connectorsManager,
+ FunctionMetaDataManager functionMetaDataManager) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();
@@ -168,9 +172,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.actionQueue = new LinkedBlockingQueue<>();
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
- dlogNamespace, actionQueue, connectorsManager);
+ dlogNamespace, actionQueue, connectorsManager, workerService.getBrokerAdmin());
this.membershipManager = membershipManager;
+ this.functionMetaDataManager = functionMetaDataManager;
}
/**
@@ -636,7 +641,17 @@ public class FunctionRuntimeManager implements AutoCloseable{
public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
- this.insertStopAction(functionRuntimeInfo);
+ Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+
+ // check if this is part of a function delete operation or update operation
+ // TODO could be a race condition here if functionMetaDataTailer somehow does not receive the functionMeta prior to the functionAssignmentsTailer gets the assignment for the function.
+ if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) {
+ // function still exists thus probably an update or stop operation
+ this.insertStopAction(functionRuntimeInfo);
+ } else {
+ // function doesn't exist anymore thus we should terminate
+ this.insertTerminateAction(functionRuntimeInfo);
+ }
this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
}
@@ -729,6 +744,19 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
}
+ void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) {
+ if (!this.isInitializePhase) {
+ FunctionAction functionAction = new FunctionAction();
+ functionAction.setAction(FunctionAction.Action.TERMINATE);
+ functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
+ try {
+ actionQueue.put(functionAction);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Interrupted while putting action");
+ }
+ }
+ }
+
private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
String fullyQualifiedInstanceId
= org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 43404ce..30d5cbd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -163,10 +163,7 @@ public class WorkerService {
// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
- this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager);
-
- // initialize function runtime manager
- this.functionRuntimeManager.initialize();
+ this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionMetaDataManager);
// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
@@ -176,6 +173,9 @@ public class WorkerService {
// initialize function metadata manager
this.functionMetaDataManager.initialize();
+ // initialize function runtime manager
+ this.functionRuntimeManager.initialize();
+
authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
// Starting cluster services
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index a4926e3..65ead73 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.runtime.Runtime;
@@ -69,7 +70,7 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue,
- new ConnectorsManager(workerConfig));
+ new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
Runtime runtime = mock(Runtime.class);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
@@ -112,7 +113,7 @@ public class FunctionActionerTest {
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue,
- new ConnectorsManager(workerConfig));
+ new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
// RuntimeSpawner
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 651e5d0..17d6642 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -85,8 +85,8 @@ public class FunctionRuntimeManagerTest {
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class)));
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -179,8 +179,8 @@ public class FunctionRuntimeManagerTest {
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class)));
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -225,8 +225,8 @@ public class FunctionRuntimeManagerTest {
.get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2);
verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
- verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
- verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
+ verify(functionRuntimeManager, times(1)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+ verify(functionRuntimeManager).insertTerminateAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
public boolean matches(Object o) {
if (o instanceof FunctionRuntimeInfo) {
@@ -244,7 +244,7 @@ public class FunctionRuntimeManagerTest {
Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
new FunctionAction()
- .setAction(FunctionAction.Action.STOP)
+ .setAction(FunctionAction.Action.TERMINATE)
.setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance(
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
.build()))));
@@ -277,8 +277,8 @@ public class FunctionRuntimeManagerTest {
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class)));
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -323,6 +323,9 @@ public class FunctionRuntimeManagerTest {
functionRuntimeManager.processAssignment(assignment3);
verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+ // make sure terminate is not called since this is a update operation
+ verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+
verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() {
@Override
public boolean matches(Object o) {
@@ -470,8 +473,8 @@ public class FunctionRuntimeManagerTest {
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class)));
functionRuntimeManager.initialize();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 678d338..527557b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -138,15 +138,15 @@ public class MembershipManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
+
+ FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
- FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+ mock(ConnectorsManager.class),
+ functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -209,16 +209,16 @@ public class MembershipManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
+
+ FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
+ mock(ConnectorsManager.class),
+ functionMetaDataManager));
- FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -305,15 +305,15 @@ public class MembershipManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
+
+ FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
- FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+ mock(ConnectorsManager.class),
+ functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
@@ -380,15 +380,15 @@ public class MembershipManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
+
+ FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
- mock(ConnectorsManager.class)
- ));
- FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+ mock(ConnectorsManager.class),
+ functionMetaDataManager));
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));
List<WorkerInfo> workerInfoList = new LinkedList<>();
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0469d80..23b7259 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -751,6 +752,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// get function info
getFunctionInfoNotFound(functionName);
+
+ // make sure subscriptions are cleanup
+ checkSubscriptionsCleanup(inputTopicName);
+
}
private static void submitExclamationFunction(Runtime runtime,
@@ -943,6 +948,21 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ private static void checkSubscriptionsCleanup(String topic) throws Exception {
+ try {
+ ContainerExecResult result = pulsarCluster.getAnyBroker().execCmd(
+ PulsarCluster.ADMIN_SCRIPT,
+ "topics",
+ "stats",
+ topic);
+ TopicStats topicStats = new Gson().fromJson(result.getStdout(), TopicStats.class);
+ assertEquals(topicStats.subscriptions.size(), 0);
+
+ } catch (ContainerExecException e) {
+ fail("Command should have exited with non-zero");
+ }
+ }
+
private static void getFunctionStatus(String functionName, int numMessages) throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,