You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/04 08:56:48 UTC

[GitHub] [pulsar] nahguam commented on a diff in pull request #16279: [refactor][function][PIP-166] Function add MANUAL delivery semantics

nahguam commented on code in PR #16279:
URL: https://github.com/apache/pulsar/pull/16279#discussion_r912746018


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -350,7 +351,8 @@ private StateStoreProvider getStateStoreProvider() throws Exception {
         }
     }
 
-    private void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
+    @VisibleForTesting
+    protected void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {

Review Comment:
   would package private be more suitable?



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java:
##########
@@ -275,8 +278,14 @@ public X process(T input, Context context) throws Exception {
             initialize(context);
         }
 
+        // record must is PulsarFunctionRecord.

Review Comment:
   ```suggestion
           // record must be PulsarFunctionRecord.
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java:
##########
@@ -360,6 +360,48 @@ public Optional<String> getDestinationTopic() {
             }, "out1");
 
 
+            pulsarSink.write(record);
+
+            Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkManualProcessor);
+            PulsarSink.PulsarSinkManualProcessor pulsarSinkManualProcessor
+                    = (PulsarSink.PulsarSinkManualProcessor) pulsarSink.pulsarSinkProcessor;
+            if (topic != null) {
+                Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(topic));
+            } else {
+                Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(defaultTopic));
+            }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(otherTopic -> {
+                if (topic != null) {
+                    return topic.equals(otherTopic);
+                } else {
+                    return defaultTopic.equals(otherTopic);
+                }
+            }));
+        }
+
+        /** test At-least-once **/
+        pulsarClient = getPulsarClient();
+        pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE);
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        for (String topic : topics) {
+
+            SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+
+                @Override
+                public String getValue() {
+                    return "in1";
+                }
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    return getTopicOptional(topic);
+                }
+            }, "out1");
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarFunctionRecordTest.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.*;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+public class PulsarFunctionRecordTest {
+
+    @Test
+    public void testAck() {
+        Record record = mock(Record.class);
+        Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATMOST_ONCE).build();
+        PulsarFunctionRecord pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.EFFECTIVELY_ONCE).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(0)).ack();
+
+        clearInvocations(record);
+        functionDetails = Function.FunctionDetails.newBuilder().setAutoAck(true)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL).build();
+        pulsarFunctionRecord = new PulsarFunctionRecord<>(record, functionDetails);
+        pulsarFunctionRecord.ack();
+        verify(record, times(1)).ack();
+    }
+}

Review Comment:
   ```suggestion
   }
   
   ```



##########
pulsar-function-go/conf/conf.go:
##########
@@ -49,8 +49,9 @@ type Conf struct {
 	ProcessingGuarantees int32  `json:"processingGuarantees" yaml:"processingGuarantees"`
 	SecretsMap           string `json:"secretsMap" yaml:"secretsMap"`
 	Runtime              int32  `json:"runtime" yaml:"runtime"`
-	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
-	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
+	//Deprecated
+	AutoACK     bool  `json:"autoAck" yaml:"autoAck"`
+	Parallelism int32 `json:"parallelism" yaml:"parallelism"`

Review Comment:
   Restore the type alignment
   
   ```suggestion
   	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
   	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
   ```



##########
pulsar-function-go/pf/instanceConf.go:
##########
@@ -103,6 +103,21 @@ func newInstanceConf() *instanceConf {
 			UserConfig: cfg.UserConfig,
 		},
 	}
+
+	if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
+		panic("Go instance current not support EFFECTIVELY_ONCE processing guarantees.")
+	}
+
+	if instanceConf.funcDetails.AutoAck == false &&
+		(instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE ||
+			instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE) {
+		panic("When Guarantees == " + instanceConf.funcDetails.ProcessingGuarantees.String() +
+			", autoAck must be equal to true, If you want not to automatically ack, " +
+			"please configure the processing guarantees as MANUAL." +
+			"This is a contradictory configuration, autoAck will be removed later," +
+			"Please refer to PIP: https://github.com/apache/pulsar/issues/15560")

Review Comment:
   ```suggestion
   			", autoAck must be equal to true. If you want not to automatically ack, " +
   			"please configure the processing guarantees as MANUAL." +
   			" This is a contradictory configuration, autoAck will be removed later." +
   			" Please refer to PIP: https://github.com/apache/pulsar/issues/15560")
   ```



##########
pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java:
##########
@@ -127,6 +131,48 @@ public void cleanUp() {
         testWindowedPulsarFunction.shutdown();
     }
 
+    @Test
+    public void testWindowFunctionWithAtmostOnce() throws Exception {
+        windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees.ATMOST_ONCE);
+        doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))).when(context)
+                .getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+        Record record = mock(Record.class);
+        when(context.getCurrentRecord()).thenReturn(record);
+        doReturn(Optional.of("test-topic")).when(record).getTopicName();
+        doReturn(record).when(context).getCurrentRecord();
+        doReturn(100l).when(record).getValue();
+        testWindowedPulsarFunction.process(10L, context);
+        verify(record, times(1)).ack();
+    }
+
+    @Test
+    public void testWindowFunctionWithAtleastOnce() throws Exception {
+
+        WindowConfig config = new WindowConfig();
+        config.setProcessingGuarantees(WindowConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        WindowFunctionExecutor windowFunctionExecutor = spy(WindowFunctionExecutor.class);
+        windowFunctionExecutor.windowConfig = config;
+        doNothing().when(windowFunctionExecutor).initialize(any());
+        doReturn(new Object()).when(windowFunctionExecutor).process(any(Window.class), any(WindowContext.class));
+        doReturn(CompletableFuture.completedFuture(null)).when(context).publish(any(), any(), any());
+
+        List<Event<Record<Long>>> tuples = new ArrayList<>();
+        tuples.add(new EventImpl<>(mock(Record.class), 0l, mock(Record.class)));
+        WindowLifecycleListener<Event<Record<Long>>> eventWindowLifecycleListener =
+                windowFunctionExecutor.newWindowLifecycleListener(context);
+
+        eventWindowLifecycleListener.onExpiry(tuples);
+        for (Event<Record<Long>> tuple : tuples) {
+            verify(tuple.getRecord(), times(0)).ack();
+        }
+
+        eventWindowLifecycleListener.onActivation(tuples, new ArrayList<>(), new ArrayList<>(), 0l);
+        for (Event<Record<Long>> tuple : tuples) {
+            verify(tuple.get(), times(1)).ack();
+        }
+    }
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -361,10 +371,26 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
             functionDetailsBuilder.setBuiltin(builtin);
         }
 
-        return functionDetailsBuilder.build();
+        return validateFunctionDetails(functionDetailsBuilder.build());
+    }
+
+    public static FunctionDetails validateFunctionDetails(FunctionDetails functionDetails)
+            throws IllegalArgumentException {
+        if (!functionDetails.getAutoAck() && functionDetails.getProcessingGuarantees()
+                == Function.ProcessingGuarantees.ATMOST_ONCE) {
+            throw new IllegalArgumentException("When Guarantees == ATMOST_ONCE, autoAck must be equal to true,"
+                    + "This is a contradictory configuration, autoAck will be removed later,"
+                    + "Please refer to PIP: https://github.com/apache/pulsar/issues/15560");

Review Comment:
   Split into multiple sentences and space after punctuation.
   
   ```suggestion
               throw new IllegalArgumentException("When Guarantees == ATMOST_ONCE, autoAck must be equal to true."
                       + " This is a contradictory configuration, autoAck will be removed later."
                       + " Please refer to PIP: https://github.com/apache/pulsar/issues/15560");
   ```



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarFunctionRecord.java:
##########
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.proto.Function;
+
+/**
+ * The record returned by the proxy to the user.
+ */
+@Slf4j
+public class PulsarFunctionRecord<T> implements Record<T> {
+
+    private final Function.FunctionDetails functionConfig;
+    private final Record<T> record;
+
+    public PulsarFunctionRecord(Record<T> record, Function.FunctionDetails functionConfig) {
+        this.functionConfig = functionConfig;
+        this.record = record;

Review Comment:
   For readability, field and assignment order match the constructor parameter order.
   
   ```suggestion
       private final Record<T> record;
       private final Function.FunctionDetails functionConfig;
   
       public PulsarFunctionRecord(Record<T> record, Function.FunctionDetails functionConfig) {
           this.record = record;
           this.functionConfig = functionConfig;
   ```



##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java:
##########
@@ -322,6 +331,7 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFu
             functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
         }
 
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/instance/src/main/python/python_instance_main.py:
##########
@@ -97,6 +97,20 @@ def main():
     args.function_details = args.function_details[:-1]
   json_format.Parse(args.function_details, function_details)
 
+  if function_details.processingGuarantees == "EFFECTIVELY_ONCE":
+    print("Python instance current not support EFFECTIVELY_ONCE processing guarantees.")
+    sys.exit(1)
+
+  if function_details.autoAck == False and function_details.processingGuarantees == "ATMOST_ONCE" \
+          or function_details.processingGuarantees == "ATLEAST_ONCE":
+    print("When Guarantees == " + function_details.processingGuarantees + ", autoAck must be equal to true, "
+          "This is a contradictory configuration, autoAck will be removed later,"
+          "If you want not to automatically ack, please configure the processing guarantees as MANUAL."
+          "This is a contradictory configuration, autoAck will be removed later," 
+          "Please refer to PIP: https://github.com/apache/pulsar/issues/15560")
+    sys.exit(1)
+
+

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java:
##########
@@ -63,6 +64,21 @@ public static class TestSinkConfig {
         private String configParameter;
     }
 
+    @Test
+    public void testAutoAckConvertFailed() throws IOException {
+
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setAutoAck(false);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
+
+        try {
+            SinkConfigUtils.convert(sinkConfig,
+                    new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            fail("Should is failed");
+        } catch (IllegalArgumentException e) {
+        }

Review Comment:
   ```suggestion
           assertThrows(IllegalArgumentException.class, () -> {
               SinkConfigUtils.convert(sinkConfig,
                       new SinkConfigUtils.ExtractedSinkDetails(null, null));
           });
   ```



##########
pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java:
##########
@@ -46,13 +46,29 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 /**
  * Unit test of {@link Reflections}.
  */
 @Slf4j
 public class FunctionConfigUtilsTest {
 
+    @Test
+    public void testAutoAckConvertFailed() {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setAutoAck(false);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
+
+        try {
+            FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);
+            fail("Should is failed");
+        } catch (IllegalArgumentException e) {
+
+        }

Review Comment:
   ```suggestion
           assertThrows(IllegalArgumentException.class, () -> {
               FunctionConfigUtils.convert(functionConfig, (ClassLoader) null);            
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org