You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/05/27 20:58:30 UTC
[pulsar] branch master updated: Remove function files stored in BK
when function is de-registered (#7052)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb182c0 Remove function files stored in BK when function is de-registered (#7052)
fb182c0 is described below
commit fb182c04c0137347ffefdd00d0e5d5bf27d979f6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 27 13:58:14 2020 -0700
Remove function files stored in BK when function is de-registered (#7052)
* remove function files stored in BK when function is de-registered
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../worker/PulsarFunctionPublishTest.java | 149 +++++++++++++++++++--
.../pulsar/functions/worker/WorkerUtils.java | 11 +-
.../functions/worker/rest/api/ComponentImpl.java | 41 +++---
3 files changed, 169 insertions(+), 32 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 8a4928f..e8b8aa8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -27,21 +27,10 @@ import static org.testng.Assert.assertNotEquals;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.pulsar.broker.NoOpShutdownService;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -77,6 +66,21 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
/**
* Test Pulsar function state
*
@@ -364,4 +368,121 @@ public class PulsarFunctionPublishTest {
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
+
+ @Test(timeOut = 20000)
+ public void testPulsarFunctionBKCleanup() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String publishTopic = "persistent://" + replNamespace + "/publishtopic";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(publishTopic).subscriptionName("sub").subscribe();
+
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ sourceTopic, publishTopic, subscriptionName);
+
+ String jarFilePath = getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ File jarFile = new File(jarFilePath);
+ Assert.assertTrue(jarFile.exists() && jarFile.isFile());
+ admin.functions().createFunction(functionConfig, jarFilePath);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+ // validate pulsar sink consumer has started on the topic
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "foo";
+ producer.newMessage().property(propertyKey, propertyValue).key(String.valueOf(i)).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ retryStrategically((test) -> {
+ try {
+ FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+ return functionStat.getProcessedSuccessfullyTotal() == 5;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ assertEquals(propertyValue, receivedPropertyValue);
+ assertEquals(msg.getProperty("input_topic"), sourceTopic);
+ assertEquals(msg.getKey(), String.valueOf(i));
+ }
+
+ // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+ // due to publish failure
+ assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+ // make sure all temp files are deleted
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
+
+ Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+
+ DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
+
+ // check if all function files are deleted from BK
+ String url = String.format("distributedlog://%s/pulsar/functions", "127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ log.info("dlog url: {}", url);
+ URI dlogUri = URI.create(url);
+
+ Namespace dlogNamespace = NamespaceBuilder.newBuilder()
+ .conf(dlogConf)
+ .clientId("function-worker-" + workerConfig.getWorkerId())
+ .uri(dlogUri)
+ .build();
+
+ List<String> files = new LinkedList<>();
+ dlogNamespace.getLogs(String.format("%s/%s/%s", tenant, namespacePortion, functionName)).forEachRemaining(new java.util.function.Consumer<String>() {
+ @Override
+ public void accept(String s) {
+ files.add(s);
+ }
+ });
+
+ assertEquals(files.size(), 0, "BK files left over: " + files);
+
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 99faadd..a4e46a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.functions.worker;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.AppendOnlyStreamWriter;
@@ -55,9 +58,6 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
@Slf4j
public final class WorkerUtils {
@@ -118,6 +118,11 @@ public final class WorkerUtils {
}
}
+ public static void deleteFromBookkeeper(Namespace namespace, String packagePath) throws IOException {
+ log.info("Deleting {} from BK", packagePath);
+ namespace.deleteLog(packagePath);
+ }
+
public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) {
int numReplicas = workerConfig.getNumFunctionPackageReplicas();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b6daa5e..cad442f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,6 +18,17 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -71,11 +82,6 @@ import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-import javax.ws.rs.core.UriBuilder;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -93,16 +99,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
@Slf4j
public abstract class ComponentImpl {
@@ -408,6 +409,16 @@ public abstract class ComponentImpl {
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
}
+
+ // clean up component files stored in BK
+ if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
+ try {
+ WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath());
+ } catch (IOException e) {
+ log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
+ functionMetaData.getPackageLocation().getPackagePath(), e);
+ }
+ }
}
public FunctionConfig getFunctionInfo(final String tenant,