You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/13 13:52:37 UTC

[GitHub] [pulsar] cbornet opened a new pull request, #16041: [improve][function] Support Record as Function output type

cbornet opened a new pull request, #16041:
URL: https://github.com/apache/pulsar/pull/16041

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ### Motivation
   
   Currently, when a user wants to dynamically set the output topic, the message properties or change the output schema in a Function, the only possibility is to create a Function that returns `Void` and that use the context and manually create a message with `Context::newOutputMessage`.  The [TypedMessageBuilderPublish](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java) Function in `pulsar-functions-api-examples` shows how to do that.
   This way of doing is not intuitive and it would be better to return a structure like `Record` that carries this info.
   
   ### Modifications
   
   This PR adds support for returning `Record` in a Function.
   * In `JavaInstanceRunnable::sendOutputMessage`, we check the type of the output object and if it's of type `Record` we create a `TargetSinkRecord` instead of a `SinkRecord`. `TargetSinkRecord` uses the info from the output record in the various Record methods that are used by the Sink when creating the output message.
   * When registering the Function, in `getFunctionTypes`, if the output type is a Record, we get the wrapped type as type for the Sink.
   * A utility method `newOutputRecordBuilder` is added to `Context` that returns a `FunctionRecord` builder initialized with the info from the source record. The builder methods can then be used to override these values as needed.
   * A `RecordFunction` is added in the Function examples to demonstrate the use of this feature
   * An integration test is added `PulsarFunctionsJavaTest`
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   Run `PulsarFunctionsJavaTest:: testRecordFunctionTest `
   
   ### Does this pull request potentially affect one of the following parts:
   yes
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
    Yes. This adds `Record<>` as a possible return type for Functions
   
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896644084


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java:
##########
@@ -120,8 +121,20 @@ public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfi
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {

Review Comment:
   I'll add UT for all the cases (Record, non-Record, j.u.f.Function, WindowFunction, ...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896875138


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java:
##########
@@ -166,7 +166,7 @@ public void testSlidingCountWindowTest() throws Exception {
     @Test(groups = {"java_function", "function"})
     public void testMergeFunctionTest() throws Exception {
 	    testMergeFunction();
-   }
+    }

Review Comment:
   I realigned completely this block but I guess the full class should be realigned.
   Maybe better to do it in another PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896487789


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java:
##########
@@ -120,8 +121,20 @@ public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfi
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {

Review Comment:
   we need some unit test coverage for these branches



##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+        Record<?> currentRecord = context.getCurrentRecord();
+        FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
+                .destinationTopic(context.getOutputTopic())
+                .properties(currentRecord.getProperties());
+        currentRecord.getTopicName().ifPresent(builder::topicName);
+        currentRecord.getKey().ifPresent(builder::key);
+        currentRecord.getEventTime().ifPresent(builder::eventTime);
+        currentRecord.getPartitionId().ifPresent(builder::partitionId);
+        currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
+        currentRecord.getRecordSequence().ifPresent(builder::recordSequence);
+
+        // TODO: add message

Review Comment:
   if you want to do this, please create a ticket and link it, otherwise please remove this "TODO"
   TODOs are usually some kind of code smell, especially in a big open source project like Pulsar.
   
   we could explain why the "message" is not available here



##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java:
##########
@@ -166,7 +166,7 @@ public void testSlidingCountWindowTest() throws Exception {
     @Test(groups = {"java_function", "function"})
     public void testMergeFunctionTest() throws Exception {
 	    testMergeFunction();
-   }
+    }

Review Comment:
   nit: remove space



##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {

Review Comment:
   we need unit test coverage for this method (ensure that every field is properly handled)



##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java:
##########
@@ -161,4 +162,11 @@ public interface Context extends BaseContext {
      * @throws PulsarClientException
      */
     <X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;
+
+    /**
+     * Create a FunctionRecordBuilder.

Review Comment:
   please explain a little bit how this is supposed to be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1191319648

   @cbornet @momo-jun is there any existing doc we can improve?
   
   Perhaps we should place the `RecordFunction` demo under "Pulsar Functions :: How to: Develop" entry with a new page?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896517241


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java:
##########
@@ -120,8 +121,20 @@ public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfi
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {

Review Comment:
   This branch is covered by IT in PulsarFunctionTest.
   I could cover also the j.u.f.Function if needed.
   I don't think there are UT for the non `Record` types. Should we add ones ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nlu90 commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
nlu90 commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r898678935


##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    /**
+     * Creates a builder for a Record from a Function Context.
+     * The builder is initialized with the output topic from the Context and with the topicName, key, eventTime,
+     * properties, partitionId, partitionIndex and recordSequence from the Context input Record.
+     * It doesn't initialize a Message at the moment.
+     *
+     * @param context a Function Context
+     * @param <T> type of Record to build
+     * @return a Record builder initialised with values from the Function Context
+     */
+     public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {

Review Comment:
   Could you help me understand why the whole `context` is passed here, instead of the `currentRecord`?



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java:
##########
@@ -481,6 +482,11 @@ public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws Pulsar
         return this.client.newConsumer(schema);
     }
 
+    @Override
+    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {

Review Comment:
   Instead of adding a new API here, do you think about creating a util class with a record generation method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896514640


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java:
##########
@@ -166,7 +166,7 @@ public void testSlidingCountWindowTest() throws Exception {
     @Test(groups = {"java_function", "function"})
     public void testMergeFunctionTest() throws Exception {
 	    testMergeFunction();
-   }
+    }

Review Comment:
   This is to align with the rest of the function on 4 spaces.
   But this whole class is misaligned...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r1067761272


##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")

Review Comment:
   Is it necessary to suppress generating the builder method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896875504


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java:
##########
@@ -120,8 +121,20 @@ public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfi
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, userClass);
+                if (typeArgs[1].equals(Record.class)) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896520082


##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {
+        Record<?> currentRecord = context.getCurrentRecord();
+        FunctionRecordBuilder<T> builder = new FunctionRecordBuilder<T>()
+                .destinationTopic(context.getOutputTopic())
+                .properties(currentRecord.getProperties());
+        currentRecord.getTopicName().ifPresent(builder::topicName);
+        currentRecord.getKey().ifPresent(builder::key);
+        currentRecord.getEventTime().ifPresent(builder::eventTime);
+        currentRecord.getPartitionId().ifPresent(builder::partitionId);
+        currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex);
+        currentRecord.getRecordSequence().ifPresent(builder::recordSequence);
+
+        // TODO: add message

Review Comment:
   Sorry, I forgot to remove that one.
   I totally agree that TODOs must not hit main branch 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r898829798


##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    /**
+     * Creates a builder for a Record from a Function Context.
+     * The builder is initialized with the output topic from the Context and with the topicName, key, eventTime,
+     * properties, partitionId, partitionIndex and recordSequence from the Context input Record.
+     * It doesn't initialize a Message at the moment.
+     *
+     * @param context a Function Context
+     * @param <T> type of Record to build
+     * @return a Record builder initialised with values from the Function Context
+     */
+     public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {

Review Comment:
   We set the `destinationTopic` from the Context `getOutputTopic`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1157377164

   > LGTM, but I do have a question, it seems this method can be a replacement of newOutputMessage, so is it possible to deprecate newOutputMessage? if not, could you please add some more context comparing newOutputRecordBuilder and newOutputMessage? thanks.
   
   That's 2 different approaches. `newOutputRecordBuilder` is useful if you design a Function that returns Record. `newOutputMessage` is more generic and could be used to send multiple messages to multiple topics for instance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1156596619

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #16041:
URL: https://github.com/apache/pulsar/pull/16041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1161068313

   Hi @cbornet , will you update relevant docs in a follow-up PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r898767431


##########
tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java:
##########
@@ -166,7 +166,7 @@ public void testSlidingCountWindowTest() throws Exception {
     @Test(groups = {"java_function", "function"})
     public void testMergeFunctionTest() throws Exception {
 	    testMergeFunction();
-   }
+    }

Review Comment:
   yes, this PR should focus on the feature and not on code clean up



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java:
##########
@@ -481,6 +482,11 @@ public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws Pulsar
         return this.client.newConsumer(schema);
     }
 
+    @Override
+    public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() {

Review Comment:
   I think it is better to not add a general purpose API to build "Record" instances.
   A Record is like a Message and you cannot build a Message using the client API or a utility class.
   The record may be tied to some internal context or pre-configured (as it does currently in this PR).
   
   So it is better to use Context as a starting point for building a new record, the same way it happens with newOutputMessage()
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1155196275

   > Is this change ABI compatible so that existing user created Pulsar Functions implementations can be run without recompiling when upgrading to a version that includes this change?
   
   @lhotari I think it is since we only add a method to the `Context` API. 
   `SinkRecord` is sometimes used outside of the package (and probably shouldn't) in tests but I think extracting some methods to a super class is OK for binary compat. Or is it not ?
   Do you see other things that could break ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#discussion_r896900334


##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/FunctionRecord.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
+
+@Builder(builderMethodName = "")
+public class FunctionRecord<T> implements Record<T> {
+
+    private final T value;
+    private final String topicName;
+    private final String destinationTopic;
+    private final Map<String, String> properties;
+    private final String key;
+    private final Schema<T> schema;
+    private final Long eventTime;
+    private final String partitionId;
+    private final Integer partitionIndex;
+    private final Long recordSequence;
+
+    public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) {

Review Comment:
   done



##########
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java:
##########
@@ -161,4 +162,11 @@ public interface Context extends BaseContext {
      * @throws PulsarClientException
      */
     <X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;
+
+    /**
+     * Create a FunctionRecordBuilder.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16041: [improve][function] Support Record as Function output type

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16041:
URL: https://github.com/apache/pulsar/pull/16041#issuecomment-1156559798

   @nlu90 Please help review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org