You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/29 08:52:09 UTC

[GitHub] [pulsar] shibd opened a new pull request, #16279: Implement java runtime instance

shibd opened a new pull request, #16279:
URL: https://github.com/apache/pulsar/pull/16279

   ### Motivation
   
   #15560 
   
   
   ### Modifications
   - 
   
   ### Verifying this change
   - *Added integration tests for end-to-end deployment with large payloads (10MB)*
   - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   - [x] `doc` 
   (Your PR contains doc changes)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1178647119

   @nahguam Thanks for your review, all suggestions are fixed, please look again. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r912790688


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -350,7 +351,8 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
         }
     }
 
-    private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    @VisibleForTesting
+    protected void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {

Review Comment:
   This may not be great, but this test is important. It is also a way of compromise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1200340683

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r916416466


##########
pulsar-function-go/pf/instanceConf_test.go:
##########
@@ -96,3 +98,18 @@ func TestInstanceConf_GetInstanceName(t *testing.T) {
 
 	assert.Equal(t, "101", instanceName)
 }
+
+func TestInstanceConf_Fail(t *testing.T) {
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 0, AutoACK: false})
+	}, "Should have a panic")
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 1, AutoACK: false})
+	}, "Should have a panic")
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 2})
+	}, "Should have a panic")

Review Comment:
   Does this will introduce break change? If users upgrade to the new version but with the old configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
nahguam commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r933769383


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -350,7 +351,8 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
         }
     }
 
-    private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    @VisibleForTesting
+    protected void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {

Review Comment:
   I think it's fine to do that, but package private is more restrictive than protected but still gives the access you need for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1200371322

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
nahguam commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r912746018


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -350,7 +351,8 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
         }
     }
 
-    private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    @VisibleForTesting
+    protected void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {

Review Comment:
   would package private be more suitable?



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java:
##########
@@ -275,8 +278,14 @@ public X process(T input, Context context) throws Exception {
             initialize(context);
         }
 
+        // record must is PulsarFunctionRecord.

Review Comment:
   ```suggestion
           // record must be PulsarFunctionRecord.
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java:
##########
@@ -360,6 +360,48 @@ public Optional<String> getDestinationTopic() {
             }, "out1");
 
 
+            pulsarSink.write(record);
+
+            Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkManualProcessor);
+            PulsarSink.PulsarSinkManualProcessor pulsarSinkManualProcessor
+                    = (PulsarSink.PulsarSinkManualProcessor) pulsarSink.pulsarSinkProcessor;
+            if (topic != null) {
+                Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(topic));
+            } else {
+                Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(defaultTopic));
+            }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> {
+                if (topic != null) {
+                    return topic.equals(otherTopic);
+                } else {
+                    return defaultTopic.equals(otherTopic);
+                }
+            }));
+        }
+
+        /** test At-least-once **/
+        pulsarClient = getPulsarClient();
+        pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE);
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        for (String topic : topics) {
+
+            SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+
+                @Override
+                public String getValue() {
+                    return "in1";
+                }
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    return getTopicOptional(topic);
+                }
+            }, "out1");
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarFunctionRecordTest.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.*;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+public class PulsarFunctionRecordTest {
+
+    @Test
+    public void testAck() {
+        Record record = mock(Record.class);
+        Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATMOST_ONCE).build();
+        PulsarFunctionRecord pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.EFFECTIVELY_ONCE).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(1)).ack();
+    }
+}

Review Comment:
   ```suggestion
   }
   
   ```



##########
pulsar-function-go/conf/conf.go:
##########
@@ -49,8 +49,9 @@ type Conf struct {
 	ProcessingGuarantees int32  `json:"processingGuarantees" yaml:"processingGuarantees"`
 	SecretsMap           string `json:"secretsMap" yaml:"secretsMap"`
 	Runtime              int32  `json:"runtime" yaml:"runtime"`
-	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
-	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
+	//Deprecated
+	AutoACK     bool  `json:"autoAck" yaml:"autoAck"`
+	Parallelism int32 `json:"parallelism" yaml:"parallelism"`

Review Comment:
   Restore the type alignment
   
   ```suggestion
   	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
   	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
   ```



##########
pulsar-function-go/pf/instanceConf.go:
##########
@@ -103,6 +103,21 @@ func newInstanceConf() *instanceConf {
 			UserConfig: cfg.UserConfig,
 		},
 	}
+
+	if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
+		panic("Go instance current not support EFFECTIVELY_ONCE processing guarantees.")
+	}
+
+	if instanceConf.funcDetails.AutoAck == false &&
+		(instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE ||
+			instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE) {
+		panic("When Guarantees == " + instanceConf.funcDetails.ProcessingGuarantees.String() +
+			", autoAck must be equal to true, If you want not to automatically ack, " +
+			"please configure the processing guarantees as MANUAL." +
+			"This is a contradictory configuration, autoAck will be removed later," +
+			"Please refer to PIP: https://github.com/apache/pulsar/issues/15560")

Review Comment:
   ```suggestion
   			", autoAck must be equal to true. If you want not to automatically ack, " +
   			"please configure the processing guarantees as MANUAL." +
   			" This is a contradictory configuration, autoAck will be removed later." +
   			" Please refer to PIP: https://github.com/apache/pulsar/issues/15560")
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java:
##########
@@ -127,6 +131,48 @@ public void cleanUp() {
         testWindowedPulsarFunction.shutdown();
     }
 
+    @Test
+    public void testWindowFunctionWithAtmostOnce() throws Exception {
+        windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees.ATMOST_ONCE);
+        doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))).when(context)
+                .getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+        Record record = mock(Record.class);
+        when(context.getCurrentRecord()).thenReturn(record);
+        doReturn(Optional.of("test-topic")).when(record).getTopicName();
+        doReturn(record).when(context).getCurrentRecord();
+        doReturn(100l).when(record).getValue();
+        testWindowedPulsarFunction.process(10L, context);
+        verify(record, times(1)).ack();
+    }
+
+    @Test
+    public void testWindowFunctionWithAtleastOnce() throws Exception {
+
+        WindowConfig config = new WindowConfig();
+        config.setProcessingGuarantees(WindowConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        WindowFunctionExecutor windowFunctionExecutor = spy(WindowFunctionExecutor.class);
+        windowFunctionExecutor.windowConfig = config;
+        doNothing().when(windowFunctionExecutor).initialize(any());
+        doReturn(new Object()).when(windowFunctionExecutor).process(any(Window.class), any(WindowContext.class));
+        doReturn(CompletableFuture.completedFuture(null)).when(context).publish(any(), any(), any());
+
+        List<Event<Record<Long>>> tuples = new ArrayList<>();
+        tuples.add(new EventImpl<>(mock(Record.class), 0l, mock(Record.class)));
+        WindowLifecycleListener<Event<Record<Long>>> eventWindowLifecycleListener =
+                windowFunctionExecutor.newWindowLifecycleListener(context);
+
+        eventWindowLifecycleListener.onExpiry(tuples);
+        for (Event<Record<Long>> tuple : tuples) {
+            verify(tuple.getRecord(), times(0)).ack();
+        }
+
+        eventWindowLifecycleListener.onActivation(tuples, new ArrayList<>(), new ArrayList<>(), 0l);
+        for (Event<Record<Long>> tuple : tuples) {
+            verify(tuple.get(), times(1)).ack();
+        }
+    }
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -361,10 +371,26 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
             functionDetailsBuilder.setBuiltin(builtin);
         }
 
-        return functionDetailsBuilder.build();
+        return validateFunctionDetails(functionDetailsBuilder.build());
+    }
+
+    public static FunctionDetails validateFunctionDetails(FunctionDetails functionDetails)
+            throws IllegalArgumentException {
+        if (!functionDetails.getAutoAck() && functionDetails.getProcessingGuarantees()
+                == Function.ProcessingGuarantees.ATMOST_ONCE) {
+            throw new IllegalArgumentException("When Guarantees == ATMOST_ONCE, autoAck must be equal to true,"
+                    + "This is a contradictory configuration, autoAck will be removed later,"
+                    + "Please refer to PIP: https://github.com/apache/pulsar/issues/15560");

Review Comment:
   Split into multiple sentences and space after punctuation.
   
   ```suggestion
               throw new IllegalArgumentException("When Guarantees == ATMOST_ONCE, autoAck must be equal to true."
                       + " This is a contradictory configuration, autoAck will be removed later."
                       + " Please refer to PIP: https://github.com/apache/pulsar/issues/15560");
   ```



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarFunctionRecord.java:
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function;
+
+/**
+ * The record returned by the proxy to the user.
+ */
+@Slf4j
+public class PulsarFunctionRecord<T> implements Record<T> {
+
+    private final Function.FunctionDetails functionConfig;
+    private final Record<T> record;
+
+    public PulsarFunctionRecord(Record<T> record, Function.FunctionDetails functionConfig) {
+        this.functionConfig = functionConfig;
+        this.record = record;

Review Comment:
   For readability, field and assignment order match the constructor parameter order.
   
   ```suggestion
       private final Record<T> record;
       private final Function.FunctionDetails functionConfig;
   
       public PulsarFunctionRecord(Record<T> record, Function.FunctionDetails functionConfig) {
           this.record = record;
           this.functionConfig = functionConfig;
   ```



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -322,6 +331,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
             functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
         }
 
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/instance/src/main/python/python_instance_main.py:
##########
@@ -97,6 +97,20 @@ def main():
     args.function_details = args.function_details[:-1]
   json_format.Parse(args.function_details, function_details)
 
+  if function_details.processingGuarantees == "EFFECTIVELY_ONCE":
+    print("Python instance current not support EFFECTIVELY_ONCE processing guarantees.")
+    sys.exit(1)
+
+  if function_details.autoAck == False and function_details.processingGuarantees == "ATMOST_ONCE" \
+          or function_details.processingGuarantees == "ATLEAST_ONCE":
+    print("When Guarantees == " + function_details.processingGuarantees + ", autoAck must be equal to true, "
+          "This is a contradictory configuration, autoAck will be removed later,"
+          "If you want not to automatically ack, please configure the processing guarantees as MANUAL."
+          "This is a contradictory configuration, autoAck will be removed later," 
+          "Please refer to PIP: https://github.com/apache/pulsar/issues/15560")
+    sys.exit(1)
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java:
##########
@@ -63,6 +64,21 @@ public static class TestSinkConfig {
         private String configParameter;
     }
 
+    @Test
+    public void testAutoAckConvertFailed() throws IOException {
+
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setAutoAck(false);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
+
+        try {
+            SinkConfigUtils.convert(sinkConfig,
+                    new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            fail("Should is failed");
+        } catch (IllegalArgumentException e) {
+        }

Review Comment:
   ```suggestion
           assertThrows(IllegalArgumentException.class, () -> {
               SinkConfigUtils.convert(sinkConfig,
                       new SinkConfigUtils.ExtractedSinkDetails(null, null));
           });
   ```



##########
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java:
##########
@@ -46,13 +46,29 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 /**
  * Unit test of {@link Reflections}.
  */
 @Slf4j
 public class FunctionConfigUtilsTest {
 
+    @Test
+    public void testAutoAckConvertFailed() {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setAutoAck(false);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
+
+        try {
+            FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+            fail("Should is failed");
+        } catch (IllegalArgumentException e) {
+
+        }

Review Comment:
   ```suggestion
           assertThrows(IllegalArgumentException.class, () -> {
               FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);            
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16279:
URL: https://github.com/apache/pulsar/pull/16279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r916533428


##########
pulsar-function-go/pf/instanceConf_test.go:
##########
@@ -96,3 +98,18 @@ func TestInstanceConf_GetInstanceName(t *testing.T) {
 
 	assert.Equal(t, "101", instanceName)
 }
+
+func TestInstanceConf_Fail(t *testing.T) {
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 0, AutoACK: false})
+	}, "Should have a panic")
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 1, AutoACK: false})
+	}, "Should have a panic")
+	assert.Panics(t, func() {
+		newInstanceConfWithConf(&cfg.Conf{ProcessingGuarantees: 2})
+	}, "Should have a panic")

Review Comment:
   It's more like a bug fix, In `go` instance, it does not support `exactly-once`. It was not checked at startup before, which will cause the function to not ack any message. `python` instance has the same problem.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] freeznet commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
freeznet commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r932916868


##########
site2/docs/functions-concepts.md:
##########
@@ -167,6 +169,9 @@ Both processing time and event time are supported.
  * Processing time is defined based on the wall time when the function instance builds and processes a window. The judging of window completeness is straightforward and you don’t have to worry about data arrival disorder. 
  * Event time is defined based on the timestamps that come with the event record. It guarantees event time correctness but also offers more data buffering and a limited completeness guarantee.
 
+Delivery Semantic Guarantees.
+ * Currently, window function does not support `Effectively-once` delivery semantics.

Review Comment:
   Is `MANUAL` supported in Window Function?



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -308,11 +308,20 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
         // windowing related
         WindowConfig windowConfig = functionConfig.getWindowConfig();
         if (windowConfig != null) {
+            // Windows Function not support EFFECTIVELY_ONCE
+            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException(
+                        "Windows Function not support EFFECTIVELY_ONCE delivery semantics.");
+            } else {
+                // Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
+                windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees

Review Comment:
   `WindowConfig.ProcessingGuarantees` only support `ATMOST_ONCE` and `ATLEAST_ONCE`, but `functionDetails.ProcessingGuarantees` do have `MANUAL` and `EFFECTIVELY_ONCE` as well. This block do ignores `EFFECTIVELY_ONCE` case, but what if the `ProcessingGuarantees` is `MANUAL`? The `WindowConfig.ProcessingGuarantees.valueOf` may throw `IllegalArgumentException`, is it as expected? If yes, can we have some better logging here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1173464301

   > @asafm @nlu90 @freeznet @eolivelli @codelipenghui Please help to review this, I am coding on `go` and `python` function instances, and will open a new PR. Thank you
   
   `go` and `python` modifications of XX have been submitted to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r932995851


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -308,11 +308,20 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
         // windowing related
         WindowConfig windowConfig = functionConfig.getWindowConfig();
         if (windowConfig != null) {
+            // Windows Function not support EFFECTIVELY_ONCE
+            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                throw new IllegalArgumentException(
+                        "Windows Function not support EFFECTIVELY_ONCE delivery semantics.");
+            } else {
+                // Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
+                windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees

Review Comment:
   Thanks, `WindowsFunction` does not support `MANUAL`, I will add some checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1174486236

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r932997157


##########
site2/docs/functions-concepts.md:
##########
@@ -167,6 +169,9 @@ Both processing time and event time are supported.
  * Processing time is defined based on the wall time when the function instance builds and processes a window. The judging of window completeness is straightforward and you don’t have to worry about data arrival disorder. 
  * Event time is defined based on the timestamps that come with the event record. It guarantees event time correctness but also offers more data buffering and a limited completeness guarantee.
 
+Delivery Semantic Guarantees.
+ * Currently, window function does not support `Effectively-once` delivery semantics.

Review Comment:
   No, I updated the documentation
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1200075730

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1200358679

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#issuecomment-1172140864

   @asafm @nlu90 @freeznet @eolivelli @codelipenghui Please help to review this, I am coding on `go` and `python` function instances, and will open a new PR. Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r912786081


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -350,7 +351,8 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
         }
     }
 
-    private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    @VisibleForTesting
+    protected void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {

Review Comment:
   I need to test it, so change to protected and add @VisibleForTesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org