You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/27 18:15:21 UTC

[GitHub] [pulsar] srkukarni opened a new pull request #7377: Functions metadata compaction

srkukarni opened a new pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #<xyz>
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   Currently we do not compact FunctionMetadata topic. This pr adds the ability to do that
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447343999



##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
##########
@@ -168,6 +168,11 @@
         doc = "The pulsar topic used for storing function metadata"
     )
     private String functionMetadataTopicName;
+    @FieldContext(
+            category = CATEGORY_FUNC_METADATA_MNG,
+            doc = "Should the metadata topic be compacted?"
+    )
+    private Boolean compactMetadataTopic = false;

Review comment:
       Given the current approach, this is a dangerous flag. If a user mistakenly flips the flag on.  It could render their cluster corrupt. We should perhaps add more warnings for what this config with do.  Instead of "compactMetadataTopic" maybe we can rename it to "useCompactedMetadataTopic"?  So that what it does is more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#issuecomment-651277848


   > @srkukarni the reader in the FunctionMetadataTopicTailer needs to
   > 
   > ` .readCompacted(true)`
   
   https://github.com/apache/pulsar/pull/7377/files#diff-fbc6eb611de17f87b29ba52e30bb7fcbR140
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447160507



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       actually prefer explicit setting. 
   Plus with this change, key is being set for whether you are compacting the topic or not




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447401144



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -206,14 +208,29 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        String key = FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails());
+        byte[] toWrite;
+        if (workerConfig.getCompactMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            lastMessageSeen = exclusiveLeaderProducer.newMessage()
+                    .key(key)
+                    .value(toWrite)
+                    .property("version", Long.toString(functionMetaData.getVersion()))

Review comment:
       Changed

##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
##########
@@ -168,6 +168,11 @@
         doc = "The pulsar topic used for storing function metadata"
     )
     private String functionMetadataTopicName;
+    @FieldContext(
+            category = CATEGORY_FUNC_METADATA_MNG,
+            doc = "Should the metadata topic be compacted?"
+    )
+    private Boolean compactMetadataTopic = false;

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r448018079



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +310,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getUseCompactedMetadataTopic()) {

Review comment:
       Should we change this check to if a key exists or not?  This creates an avenue in which a existing cluster can transition to use a compacted metadata topic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447378041



##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
##########
@@ -168,6 +168,11 @@
         doc = "The pulsar topic used for storing function metadata"
     )
     private String functionMetadataTopicName;
+    @FieldContext(
+            category = CATEGORY_FUNC_METADATA_MNG,
+            doc = "Should the metadata topic be compacted?"
+    )
+    private Boolean compactMetadataTopic = false;

Review comment:
       Will change the name. Although the possibility of corruption is low. Because before a leader can actually start writing, it needs to read all existing messages and the messages will not deserialize either way. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447164743



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       The simplest thing maybe to change this check from
   
   `if (workerConfig.getCompactMetadataTopic()` 
   
   to
   
   `if (messsage.getdata() == null)`
   
   So old messages can get processed correctly as well as the the new messages in the new format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447159078



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Or should we just check if the message has a data or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447891444



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -206,14 +210,29 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        String key = FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails());
+        byte[] toWrite;
+        if (workerConfig.getUseCompactedMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            lastMessageSeen = exclusiveLeaderProducer.newMessage()
+                    .key(key)

Review comment:
       Is that really useful?

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -206,14 +210,29 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        String key = FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails());
+        byte[] toWrite;
+        if (workerConfig.getUseCompactedMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            lastMessageSeen = exclusiveLeaderProducer.newMessage()
+                    .key(key)

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447345328



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -206,14 +208,29 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        String key = FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails());
+        byte[] toWrite;
+        if (workerConfig.getCompactMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            lastMessageSeen = exclusiveLeaderProducer.newMessage()
+                    .key(key)
+                    .value(toWrite)
+                    .property("version", Long.toString(functionMetaData.getVersion()))

Review comment:
       lets use a 
   
   ```
   private const final String
   ```
   variable for "version" 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#issuecomment-651277053


   @srkukarni the reader in the FunctionMetadataTopicTailer needs to 
   
   `                .readCompacted(true)
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447168514



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       The format of the messages are now different. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447377378



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Not just the version. We actually write function metadata when compaction is turned on. In case when it’s not compacted, we write the service request for backwards compatibility. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447378116



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -206,14 +208,29 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
         } else {
             needsScheduling = processUpdate(functionMetaData);
         }
-        Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
-                .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
-                .setFunctionMetaData(functionMetaData)
-                .setWorkerId(workerConfig.getWorkerId())
-                .setRequestId(UUID.randomUUID().toString())
-                .build();
+        String key = FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails());
+        byte[] toWrite;
+        if (workerConfig.getCompactMetadataTopic()) {
+            if (delete) {
+                toWrite = "".getBytes();
+            } else {
+                toWrite = functionMetaData.toByteArray();
+            }
+        } else {
+            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
+                    .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
+                    .setFunctionMetaData(functionMetaData)
+                    .setWorkerId(workerConfig.getWorkerId())
+                    .setRequestId(UUID.randomUUID().toString())
+                    .build();
+            toWrite = serviceRequest.toByteArray();
+        }
         try {
-            lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
+            lastMessageSeen = exclusiveLeaderProducer.newMessage()
+                    .key(key)
+                    .value(toWrite)
+                    .property("version", Long.toString(functionMetaData.getVersion()))

Review comment:
       Ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447159078



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Or should we just check if the message message has a key or not?

##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Or should we just check if the message has a key or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447162755



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       The simplest thing maybe to change this check from
   
   `if (workerConfig.getCompactMetadataTopic()` 
   
   to
   
   `if (messsage.hasKey())`
   
   So old messages can get processed correctly as well as the the new messages in the new format




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447342909



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Just the version is added as part of properties right?  We can still process the messages differently based on whether the data == null or not




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r447161260



##########
File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
##########
@@ -290,26 +307,48 @@ public synchronized void giveupLeadership() {
      */
     public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
         try {
-            Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
-            if (log.isDebugEnabled()) {
-                log.debug("Received Service Request: {}", serviceRequest);
-            }
-            switch (serviceRequest.getServiceRequestType()) {
-                case UPDATE:
-                    this.processUpdate(serviceRequest.getFunctionMetaData());
-                    break;
-                case DELETE:
-                    this.proccessDeregister(serviceRequest.getFunctionMetaData());
-                    break;
-                default:
-                    log.warn("Received request with unrecognized type: {}", serviceRequest);
+            if (workerConfig.getCompactMetadataTopic()) {

Review comment:
       Is there a way for a user to that is already using functions to be able to turn compaction on? Is there a path of migration for these users?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377#discussion_r448042974



##########
File path: pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
##########
@@ -168,6 +168,11 @@
         doc = "The pulsar topic used for storing function metadata"
     )
     private String functionMetadataTopicName;
+    @FieldContext(
+            category = CATEGORY_FUNC_METADATA_MNG,
+            doc = "Should the metadata topic be compacted?"
+    )
+    private Boolean compactMetadataTopic = false;

Review comment:
       I would suggest to add some more content to the doc annotation on what the impact of flipping the flag is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] srkukarni merged pull request #7377: Functions metadata compaction

Posted by GitBox <gi...@apache.org>.
srkukarni merged pull request #7377:
URL: https://github.com/apache/pulsar/pull/7377


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org