You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/29 20:11:57 UTC

[GitHub] [pulsar] cbornet opened a new pull request, #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

cbornet opened a new pull request, #16740:
URL: https://github.com/apache/pulsar/pull/16740

   ### Motivation
   
   For PIP 193 : Sink preprocessing Function (#16739).
   Implements changes to support the Sink transform function except the `LocalRunner` support.
   
   ### Modifications
   
   See #16739 Implementation section
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   TODO
   
   ### Does this pull request potentially affect one of the following parts:
   
     - The public API: yes
     - The rest endpoints: yes
     - The admin cli options: yes
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1325396510

   Transform functions are not part of 2.11 so we need to wait for the release before doing the doc...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r954976705


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {
+
+  AtomicLong latestVersion = new AtomicLong(0);
+  ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>();

Review Comment:
   I cut the code a little. Is it clearer now ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931095517


##########
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java:
##########
@@ -37,6 +38,7 @@ public Record<String> process(String input, Context context) throws Exception {
         return context.<String>newOutputRecordBuilder()
                 .destinationTopic(publishTopic)
                 .value(output)
+                .schema(Schema.STRING)

Review Comment:
   using that functions won't affect the result.
   maybe your function has to publish a message also somewhere else, to log something
   
   in this PIP we are simply running the function together with the Sink and removing the need of a intermediate topic.
   that said the function can do whatever it wants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931504298


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   If you have a Function with the Sink we use the input type of the Function to configure the PulsarSource instead of the Sink type. See [here](https://github.com/apache/pulsar/pull/16740/files#diff-f35e3b19e9f35ef9e868df2020221324c959deb664bff63437bb3a2ddf929397R484-R489).
   Does it answer your concerns ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931105030


##########
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java:
##########
@@ -37,6 +38,7 @@ public Record<String> process(String input, Context context) throws Exception {
         return context.<String>newOutputRecordBuilder()
                 .destinationTopic(publishTopic)
                 .value(output)
+                .schema(Schema.STRING)

Review Comment:
   I got it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931813116


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   You can use the same input type as if you had a regular Sink or Function. If you want to consume any type of Schema then yes, you should use `GenericObject` or `byte[]`. If you know that the Schema will always be String, you can use String as input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1225375437

   /pulsar-bot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r954977271


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -383,11 +411,64 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
 
     private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         AbstractSinkRecord<?> sinkRecord;
         if (output instanceof Record) {
-            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+            Record record = (Record) output;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931093028


##########
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java:
##########
@@ -37,6 +38,7 @@ public Record<String> process(String input, Context context) throws Exception {
         return context.<String>newOutputRecordBuilder()
                 .destinationTopic(publishTopic)
                 .value(output)
+                .schema(Schema.STRING)

Review Comment:
   How can we prevent users from calling these methods:
   - context.getCurrentRecord().ack()
   - context.publish()
   - context.newOutputMessage()
   - context.getPulsarAdmin()
   
   I think that this will affect the functional boundaries provided by the sink itself. e.g. processing semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r956100762


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {
+
+  AtomicLong latestVersion = new AtomicLong(0);
+  ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>();

Review Comment:
   good



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {

Review Comment:
   good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931138592


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   There is also a question about the type of input for user-defined functions.  In the current implementation, the consumer in the sink sets the schema according to the generic definition of the sink. usually, we use `GenericObject`, If the user-defined function does not match the generic, the current logic will go to the following logic instead of being sent to the sink, is this the expected behavior?
   
   
   https://github.com/apache/pulsar/blob/5df15dd2edd7eeab309fea35828915c8698ea339/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L354-L360
   
   Maybe we need to validate the input parameter types of the user-defined function at the sink start? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r954678531


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -881,4 +963,98 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
             Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+
+    private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {

Review Comment:
   We're doing these methods bc we don't have a topic. So it feels weird to put these in a class named `TopicSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1196026115

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931093028


##########
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java:
##########
@@ -37,6 +38,7 @@ public Record<String> process(String input, Context context) throws Exception {
         return context.<String>newOutputRecordBuilder()
                 .destinationTopic(publishTopic)
                 .value(output)
+                .schema(Schema.STRING)

Review Comment:
   How can we prevent users from calling these methods:
   - context.getCurrentRecord().ack()
   - context.publish()
   - context.newOutputMessage()
   - context.getPulsarAdmin()
   
   I understand that this will affect the functional boundaries provided by the sink itself. e.g. processing semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1226462507

   /pulsar-bot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931071756


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   Can we do this check at sink create or start?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet closed pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet closed pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks
URL: https://github.com/apache/pulsar/pull/16740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #16740:
URL: https://github.com/apache/pulsar/pull/16740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1330192492

   @cbornet the milestone tag is still 2.11. 
   @Technoboy- just a double-check, is this feature going to be included in 2.11? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1231394797

   /pulsar-bot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet closed pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet closed pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks
URL: https://github.com/apache/pulsar/pull/16740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931691027


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   Oh, sorry. I miss this.
   
   However, if the user wants to use it, the input parameter type may need to be carefully defined. In general, users can only use `GenericObject`, which triggers the registration of `auto_consumer` type schema, because only then can multiple topic inputs be processed.
   
   Am I right?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r931073201


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception
         try {
             this.sink.write(sinkRecord);
         } catch (Exception e) {
-            log.info("Encountered exception in sink write: ", e);
+            if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) {

Review Comment:
   you can't because each record can be different
   
   in Java even if you declare a class with some Generic types, you can always force the system to return something wrong.
   so this check must stay here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1386822766

   > Transform functions are not part of 2.11 so we need to wait for the release before doing the doc...
   
   Hi @cbornet, Since 2.11 has been released, now it's good timing to start adding the docs for this feature to the next version of docs. Feel free to share your plans and anything I can help with. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1228744580

   Following feedback, I renamed `preprocess function` to `transform function`.
   cc @shibd @eolivelli @dave2wave @mattisonchao @nlu90 @nicoloboschi @HQebupt @lhotari @michaeljmarshall 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r954978419


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -383,11 +411,64 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
 
     private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         AbstractSinkRecord<?> sinkRecord;
         if (output instanceof Record) {
-            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+            Record record = (Record) output;
+            if (sinkSchemaInfoProvider != null) {
+                // Function and Sink coupled together so we need to encode with the Function Schema
+                // and decode with the Sink schema
+                Schema encodingSchema = record.getSchema();
+                boolean isKeyValueSeparated = false;
+                if (encodingSchema instanceof KeyValueSchema) {
+                    KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) encodingSchema;
+                    // If the encoding is SEPARATED, it's easier to encode/decode with INLINE
+                    // and rebuild the SEPARATED KeyValueSchema after decoding
+                    if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+                        encodingSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema());
+                        isKeyValueSeparated = true;
+                    }
+                }
+                byte[] encoded = encodingSchema.encode(record.getValue());
+
+                if (sinkSchema.get() == null) {
+                    Schema<?> schema = getSinkSchema(record, sinkTypeArg);
+                    schema.setSchemaInfoProvider(sinkSchemaInfoProvider);
+                    sinkSchema.compareAndSet(null, schema);
+                }
+                Schema<?> schema = sinkSchema.get();
+                SchemaVersion schemaVersion = sinkSchemaInfoProvider.getSchemaVersion(encodingSchema);
+                final byte[] schemaVersionBytes = schemaVersion.bytes();
+                Object decoded = schema.decode(encoded, schemaVersionBytes);
+
+                if (schema instanceof AutoConsumeSchema) {
+                    schema = ((AutoConsumeSchema) schema).getInternalSchema(schemaVersionBytes);
+                }
+
+                final Schema<?> finalSchema;
+                if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
+                    KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
+                    finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
+                        KeyValueEncodingType.SEPARATED);
+                } else {
+                    finalSchema = schema;
+                }
+
+                sinkRecord = new OutputRecordSinkRecord<>(srcRecord, record) {

Review Comment:
   I changed the code to use distinct constructors. It does not introduce a new class though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1196365476

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1333118532

   > @cbornet the milestone tag is still 2.11. @Technoboy- just a double-check, is this feature going to be included in 2.11?
   
   Yes, not included in 2.11.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r953477852


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -383,11 +411,64 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
 
     private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         AbstractSinkRecord<?> sinkRecord;
         if (output instanceof Record) {
-            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+            Record record = (Record) output;
+            if (sinkSchemaInfoProvider != null) {
+                // Function and Sink coupled together so we need to encode with the Function Schema
+                // and decode with the Sink schema
+                Schema encodingSchema = record.getSchema();
+                boolean isKeyValueSeparated = false;
+                if (encodingSchema instanceof KeyValueSchema) {
+                    KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) encodingSchema;
+                    // If the encoding is SEPARATED, it's easier to encode/decode with INLINE
+                    // and rebuild the SEPARATED KeyValueSchema after decoding
+                    if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+                        encodingSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema());
+                        isKeyValueSeparated = true;
+                    }
+                }
+                byte[] encoded = encodingSchema.encode(record.getValue());
+
+                if (sinkSchema.get() == null) {
+                    Schema<?> schema = getSinkSchema(record, sinkTypeArg);
+                    schema.setSchemaInfoProvider(sinkSchemaInfoProvider);
+                    sinkSchema.compareAndSet(null, schema);
+                }
+                Schema<?> schema = sinkSchema.get();
+                SchemaVersion schemaVersion = sinkSchemaInfoProvider.getSchemaVersion(encodingSchema);
+                final byte[] schemaVersionBytes = schemaVersion.bytes();
+                Object decoded = schema.decode(encoded, schemaVersionBytes);
+
+                if (schema instanceof AutoConsumeSchema) {
+                    schema = ((AutoConsumeSchema) schema).getInternalSchema(schemaVersionBytes);
+                }
+
+                final Schema<?> finalSchema;
+                if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
+                    KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
+                    finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
+                        KeyValueEncodingType.SEPARATED);
+                } else {
+                    finalSchema = schema;
+                }
+
+                sinkRecord = new OutputRecordSinkRecord<>(srcRecord, record) {

Review Comment:
   what about creating a named class ? it will help Sink developers when they dump the "Record" object (and especially us when we will debug problems about this feature :-)  )



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -383,11 +411,64 @@ void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception
 
     private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         AbstractSinkRecord<?> sinkRecord;
         if (output instanceof Record) {
-            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+            Record record = (Record) output;

Review Comment:
   what about moving this block to a method ?



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {
+
+  AtomicLong latestVersion = new AtomicLong(0);
+  ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>();

Review Comment:
   it is not very clear to me (by reading the code) who is populating these maps.
   maybe we could add explicit accessors ?



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -881,4 +963,98 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
             Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+
+    private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {

Review Comment:
   we could move these 3 methods to some other class that does similar things, like TopicSchema.
   but I have no strong opinion



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {

Review Comment:
   what about adding a javadoc here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on a diff in pull request #16740: [PIP193] [feature][connectors] Add support for a preprocess Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on code in PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#discussion_r954975938


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class SinkSchemaInfoProvider implements SchemaInfoProvider {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1231331791

   /pulsar-bot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] cbornet closed pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
cbornet closed pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks
URL: https://github.com/apache/pulsar/pull/16740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] momo-jun commented on pull request #16740: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks

Posted by GitBox <gi...@apache.org>.
momo-jun commented on PR #16740:
URL: https://github.com/apache/pulsar/pull/16740#issuecomment-1255896222

   @cbornet It will help users a lot if you can add the docs for this feature. Do you have any plans on that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org