You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/05/27 20:58:30 UTC

[pulsar] branch master updated: Remove function files stored in BK when function is de-registered (#7052)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb182c0  Remove function files stored in BK when function is de-registered (#7052)
fb182c0 is described below

commit fb182c04c0137347ffefdd00d0e5d5bf27d979f6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 27 13:58:14 2020 -0700

    Remove function files stored in BK when function is de-registered (#7052)
    
    * remove function files stored in BK when function is de-registered
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../worker/PulsarFunctionPublishTest.java          | 149 +++++++++++++++++++--
 .../pulsar/functions/worker/WorkerUtils.java       |  11 +-
 .../functions/worker/rest/api/ComponentImpl.java   |  41 +++---
 3 files changed, 169 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 8a4928f..e8b8aa8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -27,21 +27,10 @@ import static org.testng.Assert.assertNotEquals;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.pulsar.broker.NoOpShutdownService;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -77,6 +66,21 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Test Pulsar function state
  *
@@ -364,4 +368,121 @@ public class PulsarFunctionPublishTest {
 
         Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
     }
+
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionBKCleanup() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String publishTopic = "persistent://" + replNamespace + "/publishtopic";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(publishTopic).subscriptionName("sub").subscribe();
+
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+          sourceTopic, publishTopic, subscriptionName);
+
+        String jarFilePath = getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        File jarFile = new File(jarFilePath);
+        Assert.assertTrue(jarFile.exists() && jarFile.isFile());
+        admin.functions().createFunction(functionConfig, jarFilePath);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        // validate pulsar sink consumer has started on the topic
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        int totalMsgs = 5;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "foo";
+            producer.newMessage().property(propertyKey, propertyValue).key(String.valueOf(i)).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        retryStrategically((test) -> {
+            try {
+                FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+                return functionStat.getProcessedSuccessfullyTotal() == 5;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedPropertyValue = msg.getProperty(propertyKey);
+            assertEquals(propertyValue, receivedPropertyValue);
+            assertEquals(msg.getProperty("input_topic"), sourceTopic);
+            assertEquals(msg.getKey(), String.valueOf(i));
+        }
+
+        // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+        // due to publish failure
+        assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+          totalMsgs);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+        // make sure all temp files are deleted
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
+
+        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+
+        DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
+
+        // check if all function files are deleted from BK
+        String url = String.format("distributedlog://%s/pulsar/functions", "127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        log.info("dlog url: {}", url);
+        URI dlogUri = URI.create(url);
+
+        Namespace dlogNamespace = NamespaceBuilder.newBuilder()
+          .conf(dlogConf)
+          .clientId("function-worker-" + workerConfig.getWorkerId())
+          .uri(dlogUri)
+          .build();
+
+        List<String> files = new LinkedList<>();
+        dlogNamespace.getLogs(String.format("%s/%s/%s", tenant, namespacePortion, functionName)).forEachRemaining(new java.util.function.Consumer<String>() {
+            @Override
+            public void accept(String s) {
+                files.add(s);
+            }
+        });
+
+        assertEquals(files.size(), 0, "BK files left over: " + files);
+
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 99faadd..a4e46a8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.AppendOnlyStreamWriter;
@@ -55,9 +58,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 @Slf4j
 public final class WorkerUtils {
 
@@ -118,6 +118,11 @@ public final class WorkerUtils {
         }
     }
 
+    public static void deleteFromBookkeeper(Namespace namespace, String packagePath) throws IOException {
+        log.info("Deleting {} from BK", packagePath);
+        namespace.deleteLog(packagePath);
+    }
+
     public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) {
         int numReplicas = workerConfig.getNumFunctionPackageReplicas();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b6daa5e..cad442f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,6 +18,17 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -71,11 +82,6 @@ import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-import javax.ws.rs.core.UriBuilder;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -93,16 +99,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
-import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriBuilder;
 
 @Slf4j
 public abstract class ComponentImpl {
@@ -408,6 +409,16 @@ public abstract class ComponentImpl {
                     ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
         }
+
+        // clean up component files stored in BK
+        if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
+            try {
+                WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath());
+            } catch (IOException e) {
+                log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
+                  functionMetaData.getPackageLocation().getPackagePath(), e);
+            }
+        }
     }
 
     public FunctionConfig getFunctionInfo(final String tenant,