You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/01 04:27:38 UTC

[pulsar] branch master updated: Functions metadata compaction (#7377)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d94553  Functions metadata compaction (#7377)
3d94553 is described below

commit 3d94553fe7f1f6ed17d3e115e1d08a02585c7a77
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Jun 30 21:27:25 2020 -0700

    Functions metadata compaction (#7377)
    
    * Function workers re-direct call update requests to the leader
    
    * Fixed test
    
    * tests pass
    
    * Working version
    
    * Fix test
    
    * Short circuit update
    
    * Fix test
    
    * Fix test
    
    * Fix tests
    
    * Added one more catch
    
    * Added one more catch
    
    * Seperated internal and external errors
    
    * Fix test
    
    * Address feedback
    
    * Do not expose updateOnLeader to functions
    
    * hide api
    
    * hide api
    
    * removed duplicate comments
    
    * Do leadership changes in function metadata manager
    
    * make the function sync
    
    * Added more comments
    
    * Throw error
    
    * Changed name
    
    * address comments
    
    * Deleted unused classes
    
    * Rework metadata manager
    
    * Working
    
    * Fix test
    
    * A better way for test
    
    * Address feedback
    
    * Added an option to compact function metadata topic
    
    * Address feedback
    
    * Incorporate feedback
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/functions/worker/WorkerConfig.java      |   5 +
 .../pulsar/functions/utils/FunctionCommon.java     |  20 ++++
 .../functions/worker/FunctionMetaDataManager.java  | 109 +++++++++++++++------
 .../worker/FunctionMetaDataTopicTailer.java        |  10 +-
 .../pulsar/functions/worker/SchedulerManager.java  |  21 ++++
 .../worker/FunctionMetaDataManagerTest.java        |  59 ++++++++++-
 6 files changed, 186 insertions(+), 38 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index bb8d896..c056028 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -169,6 +169,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private String functionMetadataTopicName;
     @FieldContext(
+            category = CATEGORY_FUNC_METADATA_MNG,
+            doc = "Should the metadata topic be compacted?"
+    )
+    private Boolean useCompactedMetadataTopic = false;
+    @FieldContext(
         category = CATEGORY_FUNC_METADATA_MNG,
         doc = "The web service url for function workers"
     )
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index b8ac299..5a68855 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -341,6 +341,26 @@ public class FunctionCommon {
         return String.format("%s/%s/%s", tenant, namespace, functionName);
     }
 
+    public static String extractTenantFromFullyQualifiedName(String fqfn) {
+        return extractFromFullyQualifiedName(fqfn, 0);
+    }
+
+    public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
+        return extractFromFullyQualifiedName(fqfn, 1);
+    }
+
+    public static String extractNameFromFullyQualifiedName(String fqfn) {
+        return extractFromFullyQualifiedName(fqfn, 2);
+    }
+
+    private static String extractFromFullyQualifiedName(String fqfn, int index) {
+        String[] parts = fqfn.split("/");
+        if (parts.length >= 3) {
+            return parts[index];
+        }
+        throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
+    }
+
     public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader)
             throws ClassNotFoundException {
         Class<?> loadedClass = classLoader.loadClass(className);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 795a735..abe2b38 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -68,7 +69,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
     // Note that this variable serves a double duty. A non-null value
     // implies we are the leader, while a null value means we are not the leader
     private Producer exclusiveLeaderProducer;
-    private MessageId lastMessageSeen = MessageId.earliest;
+    @Getter
+    private volatile MessageId lastMessageSeen = MessageId.earliest;
+
+    private static final String versionTag = "version";
 
     @Getter
     private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
@@ -206,14 +210,30 @@ public class FunctionMetaDataManager implements AutoCloseable {
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        byte[] toWrite;
+        if (workerConfig.getUseCompactedMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
+                    .value(toWrite)
+                    .property(versionTag, Long.toString(functionMetaData.getVersion()));
+            if (workerConfig.getUseCompactedMetadataTopic()) {
+                builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+            }
+            lastMessageSeen = builder.send();
         } catch (Exception e) {
             log.error("Could not write into Function Metadata topic", e);
             throw new IllegalStateException("Internal Error updating function at the leader", e);
@@ -290,19 +310,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getUseCompactedMetadataTopic()) {
+                processCompactedMetaDataTopicMessage(message);
+            } else {
+                processUncompactedMetaDataTopicMessage(message);
             }
         } catch (IllegalArgumentException e) {
             // Its ok. Nothing much we can do about it
@@ -310,6 +321,37 @@ public class FunctionMetaDataManager implements AutoCloseable {
         lastMessageSeen = message.getMessageId();
     }
 
+    private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+        Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
+        if (log.isDebugEnabled()) {
+            log.debug("Received Service Request: {}", serviceRequest);
+        }
+        switch (serviceRequest.getServiceRequestType()) {
+            case UPDATE:
+                this.processUpdate(serviceRequest.getFunctionMetaData());
+                break;
+            case DELETE:
+                this.proccessDeregister(serviceRequest.getFunctionMetaData());
+                break;
+            default:
+                log.warn("Received request with unrecognized type: {}", serviceRequest);
+        }
+    }
+
+    private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
+        long version = Long.valueOf(message.getProperty(versionTag));
+        String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
+        String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
+        String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
+        if (message.getData() == null || message.getData().length == 0) {
+            // this is a delete message
+            this.proccessDeregister(tenant, namespace, functionName, version);
+        } else {
+            FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
+            this.processUpdate(functionMetaData);
+        }
+    }
+
     /**
      * Private methods for internal use.  Should not be used outside of this class
      */
@@ -336,25 +378,29 @@ public class FunctionMetaDataManager implements AutoCloseable {
 
     @VisibleForTesting
     synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
-
         String functionName = deregisterRequestFs.getFunctionDetails().getName();
         String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
         String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
+        return proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
+    }
+
+    synchronized boolean proccessDeregister(String tenant, String namespace,
+                                            String functionName, long version) throws IllegalArgumentException {
 
         boolean needsScheduling = false;
 
-        log.debug("Process deregister request: {}", deregisterRequestFs);
+        log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version);
 
         // Check if we still have this function. Maybe already deleted by someone else
-        if (this.containsFunctionMetaData(deregisterRequestFs)) {
+        if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
             // check if request is outdated
-            if (!isRequestOutdated(deregisterRequestFs)) {
+            if (!isRequestOutdated(tenant, namespace, functionName, version)) {
                 this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
                 needsScheduling = true;
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
-                            deregisterRequestFs.getVersion());
+                            version);
                 }
                 throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
             }
@@ -393,9 +439,14 @@ public class FunctionMetaDataManager implements AutoCloseable {
 
     private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
         Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
-        FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
-                .get(functionDetails.getNamespace()).get(functionDetails.getName());
-        return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
+        return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(),
+                functionDetails.getName(), requestFunctionMetaData.getVersion());
+    }
+
+    private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
+        FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
+                .get(namespace).get(functionName);
+        return currentFunctionMetaData.getVersion() >= version;
     }
 
     @VisibleForTesting
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index 8b51971..0974835 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Request.ServiceRequest;
 
 @Slf4j
 public class FunctionMetaDataTopicTailer
@@ -131,11 +130,14 @@ public class FunctionMetaDataTopicTailer
 
     public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
                                       MessageId startMessageId) throws PulsarClientException {
-        return readerBuilder
+        ReaderBuilder builder = readerBuilder
                 .topic(workerConfig.getFunctionMetadataTopic())
                 .startMessageId(startMessageId)
                 .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
-                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
-                .create();
+                .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer");
+        if (workerConfig.getUseCompactedMetadataTopic()) {
+            builder = builder.readCompacted(true);
+        }
+        return builder.create();
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 0b8eeb2..0cd682a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -112,6 +112,8 @@ public class SchedulerManager implements AutoCloseable {
     @Getter
     private MessageId lastMessageProduced = null;
 
+    private MessageId metadataTopicLastMessage = MessageId.earliest;
+
     public SchedulerManager(WorkerConfig workerConfig,
                             PulsarClient pulsarClient,
                             PulsarAdmin admin,
@@ -224,6 +226,13 @@ public class SchedulerManager implements AutoCloseable {
                     isCompactionNeeded.set(false);
                 }
             }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
+
+            executor.scheduleWithFixedDelay(() -> {
+                if (leaderService.isLeader() && metadataTopicLastMessage.compareTo(functionMetaDataManager.getLastMessageSeen()) != 0) {
+                    metadataTopicLastMessage = functionMetaDataManager.getLastMessageSeen();
+                    compactFunctionMetadataTopic();
+                }
+            }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
         }
     }
     
@@ -337,6 +346,18 @@ public class SchedulerManager implements AutoCloseable {
         }
     }
 
+    private void compactFunctionMetadataTopic() {
+        if (this.admin != null) {
+            try {
+                this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
+            } catch (PulsarAdminException e) {
+                log.error("Failed to trigger compaction", e);
+                scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
+                        TimeUnit.SECONDS);
+            }
+        }
+    }
+
     private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
         try {
             String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index b5fb0df..25e62f0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,12 +35,25 @@ import org.testng.annotations.Test;
 
 public class FunctionMetaDataManagerTest {
 
+    static byte[] producerByteArray;
+
     private static PulsarClient mockPulsarClient() throws PulsarClientException {
         ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
         when(builder.topic(anyString())).thenReturn(builder);
         when(builder.producerName(anyString())).thenReturn(builder);
 
-        when(builder.create()).thenReturn(mock(Producer.class));
+        Producer producer = mock(Producer.class);
+        TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
+        when(messageBuilder.key(anyString())).thenReturn(messageBuilder);
+        doAnswer(invocation -> {
+            Object arg0 = invocation.getArgument(0);
+            FunctionMetaDataManagerTest.producerByteArray = (byte[])arg0;
+            return messageBuilder;
+        }).when(messageBuilder).value(any());
+        when(messageBuilder.property(anyString(), anyString())).thenReturn(messageBuilder);
+        when(producer.newMessage()).thenReturn(messageBuilder);
+
+        when(builder.create()).thenReturn(producer);
 
         PulsarClient client = mock(PulsarClient.class);
         when(client.newProducer()).thenReturn(builder);
@@ -86,10 +100,20 @@ public class FunctionMetaDataManagerTest {
     }
 
     @Test
-    public void testUpdateIfLeaderFunction() throws PulsarClientException {
+    public void testUpdateIfLeaderFunctionWithoutCompaction() throws PulsarClientException {
+        testUpdateIfLeaderFunction(false);
+    }
+
+    @Test
+    public void testUpdateIfLeaderFunctionWithCompaction() throws PulsarClientException {
+        testUpdateIfLeaderFunction(true);
+    }
+
+    private void testUpdateIfLeaderFunction(boolean compact) throws PulsarClientException {
 
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
+        workerConfig.setUseCompactedMetadataTopic(compact);
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
@@ -110,6 +134,11 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager.acquireLeadership();
         // Now w should be able to really update
         functionMetaDataManager.updateFunctionOnLeader(m1, false);
+        if (compact) {
+            Assert.assertTrue(Arrays.equals(m1.toByteArray(), producerByteArray));
+        } else {
+            Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
+        }
 
         // outdated request
         try {
@@ -119,15 +148,30 @@ public class FunctionMetaDataManagerTest {
             Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
         }
         // udpate with new version
-        m1 = m1.toBuilder().setVersion(2).build();
-        functionMetaDataManager.updateFunctionOnLeader(m1, false);
+        Function.FunctionMetaData m2 = m1.toBuilder().setVersion(2).build();
+        functionMetaDataManager.updateFunctionOnLeader(m2, false);
+        if (compact) {
+            Assert.assertTrue(Arrays.equals(m2.toByteArray(), producerByteArray));
+        } else {
+            Assert.assertFalse(Arrays.equals(m2.toByteArray(), producerByteArray));
+        }
+    }
+
+    @Test
+    public void deregisterFunctionWithoutCompaction() throws PulsarClientException {
+        deregisterFunction(false);
     }
 
     @Test
-    public void deregisterFunction() throws PulsarClientException {
+    public void deregisterFunctionWithCompaction() throws PulsarClientException {
+        deregisterFunction(true);
+    }
+
+    private void deregisterFunction(boolean compact) throws PulsarClientException {
         SchedulerManager mockedScheduler = mock(SchedulerManager.class);
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
+        workerConfig.setUseCompactedMetadataTopic(compact);
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mockedScheduler,
@@ -170,6 +214,11 @@ public class FunctionMetaDataManagerTest {
         m1 = m1.toBuilder().setVersion(2).build();
         functionMetaDataManager.updateFunctionOnLeader(m1, true);
         verify(mockedScheduler, times(2)).schedule();
+        if (compact) {
+            Assert.assertTrue(Arrays.equals("".getBytes(), producerByteArray));
+        } else {
+            Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
+        }
     }
 
     @Test